Change Streams
On this page
MongoDB Server version 3.6 introduces the $changeStream
aggregation pipeline
operator.
Change streams provide a way to watch changes to documents in a
collection. To improve the usability of this new stage, the
MongoCollection
type includes the watch()
method. The
ChangeStreamObservable
instance sets up the change stream and automatically
attempts to resume if it encounters a potentially recoverable error.
Prerequisites
You must set up the following components to run the code examples in this guide:
A
test.restaurants
collection populated with documents from therestaurants.json
file in the documentation assets GitHub.The following import statements:
import java.util.concurrent.CountDownLatch import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.changestream._
Note
This guide uses the Observable
implicits as covered in the
Quick Start Primer.
Connect to a MongoDB Deployment
First, connect to a MongoDB deployment, then declare and define
MongoDatabase
and MongoCollection
instances.
The following code connects to a standalone
MongoDB deployment running on localhost
on port 27017
. Then, it
defines the database
variable to refer to the test
database and
the collection
variable to refer to the restaurants
collection:
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.
Watch for Changes in a Collection
To create a change stream use one of the MongoCollection.watch()
methods.
In the following example, the change stream prints out all changes it observes:
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() } val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await() // Block waiting for the latch
Watch for Changes on a Database
Applications can open a single change stream to watch all non-system
collections of a database. To create such a change stream, use one of the
MongoDatabase.watch()
methods.
In the following example, the change stream prints out all the changes it observes on the given database:
val observer = LatchedObserver() database.watch().subscribe(observer) observer.await() // Block waiting for the latch
Watch for Changes on All Databases
Applications can open a single change stream to watch all non-system
collections of all databases in a MongoDB deployment. To create such a
change stream, use one of the MongoClient.watch()
methods.
In the following example, the change stream prints out all the changes
it observes in the deployment to which the MongoClient
is connected:
val observer = LatchedObserver() client.watch().subscribe(observer) observer.await() // Block waiting for the latch
Filtering Content
You can pass a list of aggregation stages to the watch()
method to
modify the data returned by the $changeStream
operator.
Note
Not all aggregation operators are supported. See Change Streams in the Server manual to learn more.
In the following example, the change stream prints out all changes it
observes corresponding to insert
, update
, replace
and
delete
operations.
First, the pipeline includes a $match
stage to filter for documents
where the operationType
is either an insert
, update
, replace
or
delete
. Then, it sets the fullDocument
to
FullDocument.UPDATE_LOOKUP
, so that the document after the update is
included in the results:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete"))))) .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer) observer.await() // Block waiting for the latch