Docs Menu
Docs Home
/ / /
Scala
/

Change Streams

On this page

  • Prerequisites
  • Connect to a MongoDB Deployment
  • Watch for Changes in a Collection
  • Watch for Changes on a Database
  • Watch for Changes on All Databases
  • Filtering Content

MongoDB Server version 3.6 introduces the $changeStream aggregation pipeline operator.

Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection type includes the watch() method. The ChangeStreamObservable instance sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.

You must set up the following components to run the code examples in this guide:

  • A test.restaurants collection populated with documents from the restaurants.json file in the documentation assets GitHub.

  • The following import statements:

import java.util.concurrent.CountDownLatch
import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.changestream._

Note

This guide uses the Observable implicits as covered in the Quick Start Primer.

First, connect to a MongoDB deployment, then declare and define MongoDatabase and MongoCollection instances.

The following code connects to a standalone MongoDB deployment running on localhost on port 27017. Then, it defines the database variable to refer to the test database and the collection variable to refer to the restaurants collection:

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.

To create a change stream use one of the MongoCollection.watch() methods.

In the following example, the change stream prints out all changes it observes:

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}
val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await() // Block waiting for the latch

Applications can open a single change stream to watch all non-system collections of a database. To create such a change stream, use one of the MongoDatabase.watch() methods.

In the following example, the change stream prints out all the changes it observes on the given database:

val observer = LatchedObserver()
database.watch().subscribe(observer)
observer.await() // Block waiting for the latch

Applications can open a single change stream to watch all non-system collections of all databases in a MongoDB deployment. To create such a change stream, use one of the MongoClient.watch() methods.

In the following example, the change stream prints out all the changes it observes in the deployment to which the MongoClient is connected:

val observer = LatchedObserver()
client.watch().subscribe(observer)
observer.await() // Block waiting for the latch

You can pass a list of aggregation stages to the watch() method to modify the data returned by the $changeStream operator.

Note

Not all aggregation operators are supported. See Change Streams in the Server manual to learn more.

In the following example, the change stream prints out all changes it observes corresponding to insert, update, replace and delete operations.

First, the pipeline includes a $match stage to filter for documents where the operationType is either an insert, update, replace or delete. Then, it sets the fullDocument to FullDocument.UPDATE_LOOKUP, so that the document after the update is included in the results:

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
observer.await() // Block waiting for the latch

Back

Aggregation Framework