Docs Menu
Docs Home
/ / /
Scala
/

Aggregation Framework

On this page

  • Prerequisites
  • Connect to a MongoDB Deployment
  • Perform Aggregation
  • Use Aggregation Expressions
  • Explain an Aggregation

The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.

To learn more about aggregation, see Aggregation Pipeline in the Server manual.

You must set up the following components to run the code examples in this guide:

  • A test.restaurants collection populated with documents from the restaurants.json file in the documentation assets GitHub.

  • The following import statements:

import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Accumulators._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Projections._

Note

This guide uses the Observable implicits as covered in the Quick Start Primer.

First, connect to a MongoDB deployment, then declare and define MongoDatabase and MongoCollection instances.

The following code connects to a standalone MongoDB deployment running on localhost on port 27017. Then, it defines the database variable to refer to the test database and the collection variable to refer to the restaurants collection:

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.

To perform aggregation, pass a list of aggregation stages to the MongoCollection.aggregate() method. The driver provides the Aggregates helper class that contains builders for aggregation stages.

In this example, the aggregation pipeline performs the following tasks:

  • Uses a $match stage to filter for documents in which the categories array field contains the element "Bakery". The example uses Aggregates.filter() to build the $match stage.

  • Uses a $group stage to group the matching documents by the stars field, accumulating a count of documents for each distinct value of stars. The example uses Aggregates.group() to build the $group stage and Accumulators.sum() to build the accumulator expression. For the accumulator expressions for use within the $group stage, the driver provides Accumulators helper class.

collection.aggregate(Seq(
Aggregates.filter(Filters.equal("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)).printResults()

For $group accumulator expressions, the driver provides the Accumulators helper class. For other aggregation expressions, manually build the expression by using the Document class.

In the following example, the aggregation pipeline uses a $project stage to return only the name field and the calculated field firstCategory whose value is the first element in the categories array. The example uses Aggregates.project() and various Projections class methods to build the $project stage:

collection.aggregate(
Seq(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
Document("$arrayElemAt"-> Seq("$categories", 0))
)
)
)
)
).printResults()

To $explain an aggregation pipeline, call the AggregatePublisher.explain() method:

collection.aggregate(
Seq(Aggregates.filter(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1)))
).explain().printResults()

Back

Bulk Write Operations

Next

Change Streams