Aggregation Framework
On this page
The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
To learn more about aggregation, see Aggregation Pipeline in the Server manual.
Prerequisites
You must set up the following components to run the code examples in this guide:
A
test.restaurants
collection populated with documents from therestaurants.json
file in the documentation assets GitHub.The following import statements:
import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Accumulators._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.Projections._
Note
This guide uses the Observable
implicits as covered in the
Quick Start Primer.
Connect to a MongoDB Deployment
First, connect to a MongoDB deployment, then declare and define
MongoDatabase
and MongoCollection
instances.
The following code connects to a standalone
MongoDB deployment running on localhost
on port 27017
. Then, it
defines the database
variable to refer to the test
database and
the collection
variable to refer to the restaurants
collection:
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.
Perform Aggregation
To perform aggregation, pass a list of aggregation stages to the
MongoCollection.aggregate()
method. The driver provides the
Aggregates
helper class that contains builders for aggregation
stages.
In this example, the aggregation pipeline performs the following tasks:
Uses a
$match
stage to filter for documents in which thecategories
array field contains the element"Bakery"
. The example usesAggregates.filter()
to build the$match
stage.
Uses a
$group
stage to group the matching documents by thestars
field, accumulating a count of documents for each distinct value ofstars
. The example usesAggregates.group()
to build the$group
stage andAccumulators.sum()
to build the accumulator expression. For the accumulator expressions for use within the$group
stage, the driver providesAccumulators
helper class.
collection.aggregate(Seq( Aggregates.filter(Filters.equal("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) )).printResults()
Use Aggregation Expressions
For $group
accumulator expressions, the driver provides the
Accumulators
helper class. For other aggregation expressions,
manually build the expression by using the Document
class.
In the following example, the aggregation pipeline uses a
$project
stage to return only the name
field and the calculated
field firstCategory
whose value is the first element in the
categories
array. The example uses Aggregates.project()
and
various Projections
class methods to build the $project
stage:
collection.aggregate( Seq( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", Document("$arrayElemAt"-> Seq("$categories", 0)) ) ) ) ) ).printResults()
Explain an Aggregation
To $explain
an aggregation pipeline, call the
AggregatePublisher.explain()
method:
collection.aggregate( Seq(Aggregates.filter(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1))) ).explain().printResults()