Watch for Changes
You can keep track of changes to data in MongoDB, such as changes to a collection, database, or deployment, by opening a change stream. A change stream allows applications to watch for changes to data and react to them.
The change stream returns change event documents when changes occur. A change event contains information about the updated data.
Open a change stream by calling the watch()
method on a
MongoCollection
, MongoDatabase
, or MongoClient
object as shown in
the following code example:
ChangeStreamIterable<Document> changeStream = database.watch();
The watch()
method optionally takes an aggregation pipeline which
consists of an array of stages as the first parameter to filter and
transform the change event output as follows:
List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
The watch()
method returns an instance of ChangeStreamIterable
, a class
that offers several methods to access, organize, and traverse the results.
ChangeStreamIterable
also inherits methods from its parent class,
MongoIterable
which implements the core Java interface Iterable
.
You can call forEach()
on the ChangeStreamIterable
to handle
events as they occur, or you can use the iterator()
method which
returns a MongoCursor
instance that you can use to traverse the results.
You can call methods on the MongoCursor
such as hasNext()
to check
whether additional results exist, next()
to return the next document
in the collection, or tryNext()
, to immediately return either
the next available element in the change stream or null
. Unlike the
MongoCursor
returned by other queries, a MongoCursor
associated
with a change stream waits until a change event arrives before
returning a result from next()
. As a result, calls to next()
using a change stream's MongoCursor
never throw a
java.util.NoSuchElementException
.
To configure options for processing the documents returned from the change
stream, use member methods of the ChangeStreamIterable
object returned
by watch()
. See the link to the ChangeStreamIterable
API
documentation at the bottom of this example for more details on the
available methods.
How to Process Change Stream Events with a Callback
To capture events from a change stream, call the forEach()
method
with a callback function as shown below:
changeStream.forEach(event -> System.out.println("Change observed: " + event));
The callback function triggers when a change event is emitted. You can specify logic in the callback to process the event document when it is received.
Important
forEach() blocks the current thread
Calls to forEach()
block the current thread as long as the
corresponding change stream listens for events. If your program
needs to continue executing other logic, such as processing requests or
responding to user input, consider creating and listening to your
change stream in a separate thread.
Note
For update operation change events, change streams only return the modified
fields by default rather than the entire updated document. You can configure
your change stream to also return the most current version of the document
by calling the fullDocument()
member method of the ChangeStreamIterable
object with the value FullDocument.UPDATE_LOOKUP
as follows:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Example
The following example uses two separate applications to demonstrate how to listen for changes using a change stream:
The first application, named
Watch
, opens a change stream on themovies
collection in thesample_mflix
database.Watch
uses an aggregation pipeline to filter changes based onoperationType
so that it only receives insert and update events (deletes are excluded by omission).Watch
uses a callback to receive and print the filtered change events that occur on the collection.The second application, named
WatchCompanion
, inserts a single document into themovies
collection in thesample_mflix
database. Next,WatchCompanion
updates the document with a new field value. Finally,WatchCompanion
deletes the document.
First, run Watch
to open the change stream on the collection and
define a callback on the change stream using the forEach()
method.
While Watch
is running, run WatchCompanion
to generate change
events by performing changes to the collection.
Note
This example connects to an instance of MongoDB using a connection URI. To learn more about connecting to your MongoDB instance, see the connection guide.
Watch
:
package usage.examples; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); // variables referenced in a lambda must be final; final array gives us a mutable integer final int[] numberOfEvents = {0}; changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion
:
package usage.examples; import java.util.Arrays; import org.bson.Document; import org.bson.types.ObjectId; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Success! Inserted document id: " + insertResult.getInsertedId()); UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
If you run the preceding applications in sequence, you should see output from
the Watch
application that is similar to the following. Only the
insert
and update
operations are printed, since the aggregation
pipeline filters out the delete
operation:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
You should also see output from the WatchCompanion
application that
is similar to the following:
Success! Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
Tip
Legacy API
If you are using the legacy API, see our FAQ page to learn what changes you need to make to this code example.
For additional information on the classes and methods mentioned on this page, see the following resources:
Change Streams Server Manual Entry
Change Events Server Manual Entry
Aggregation Pipeline Server Manual Entry
Aggregation Stages Server Manual Entry
ChangeStreamIterable API Documentation
MongoCollection.watch() API Documentation
MongoDatabase.watch() API Documentation
MongoClient.watch() API Documentation