Open Change Streams
On this page
- Overview
- Open a Change Stream
- Select a Scope to Watch
- Filter the Events
- Manage the Output
- Example
- Watch Example: Full Files
- Full File Example Output
- Apply Aggregation Operators to your Change Stream
- Example
- Split Large Change Stream Events
- Include Pre-images and Post-images
- Create a Collection with Pre-Image and Post-Images Enabled
- Pre-image Configuration Example
- Post-image Configuration Example
- Additional Information
- API Documentation
- Server Manual Entries
Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your database. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a single collection, database, or deployment.
You can specify a set of aggregation operators to filter and transform the data your application receives. When connecting to a MongoDB deployment v6.0 or later, you can also configure the events to include the document data before and after the change.
Learn how to open and configure your change streams in the following sections:
Open a Change Stream
You can open a change stream to subscribe to specific types of data changes and produce change events in your application.
Select a Scope to Watch
To open a change stream, call the watch()
method on an instance of a
MongoCollection
, MongoDatabase
, or MongoClient
.
Important
Standalone MongoDB deployments don't support change streams because the feature requires a replica set oplog. To learn more about the oplog, see the Replica Set Oplog MongoDB Server manual page.
The object on which you call the watch()
method on determines the scope of
events that the change stream listens for:
MongoCollection.watch()
monitors a collection.MongoDatabase.watch()
monitors all collections in a database.MongoClient.watch()
monitors all changes in the connected MongoDB deployment.
Filter the Events
The watch()
method takes an optional aggregation pipeline as the first
parameter, which consists of a list of stages that can be used to
filter and transform the change event output, as follows:
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
Note
For update operation change events, change streams only return the modified
fields by default rather than the entire updated document. You can configure
your change stream to also return the most current version of the document
by calling the fullDocument()
member method of the ChangeStreamIterable
object with the value FullDocument.UPDATE_LOOKUP
as follows:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Manage the Output
The watch()
method returns an instance of ChangeStreamIterable
, an interface
that offers several methods to access, organize, and traverse the results.
ChangeStreamIterable
also inherits methods from its parent interface,
MongoIterable
which implements the core Java interface Iterable
.
You can call forEach()
on the ChangeStreamIterable
to handle
events as they occur, or you can use the iterator()
method which
returns a MongoChangeStreamCursor
instance that you can use to traverse the results.
You can call the following methods on the MongoChangeStreamCursor`:
hasNext()
: checks if there are more results.next()
returns the next document in the collection.tryNext()
immediately returns either the next available element in the change stream ornull
.
Important
Iterating the Cursor Blocks the Current Thread
Iterating through a cursor using forEach()
or any iterator()
method
blocks the current thread while the corresponding change stream listens for
events. If your program needs to continue executing other logic, such as
processing requests or responding to user input, consider creating and
listening to your change stream in a separate thread.
Unlike the MongoCursor
returned by other queries, a
MongoChangeStreamCursor
associated with a change stream waits until a change
event arrives before returning a result from next()
. As a result, calls to
next()
using a change stream's MongoChangeStreamCursor
never throw a java.util.NoSuchElementException
.
To configure options for processing the documents returned from the change
stream, use member methods of the ChangeStreamIterable
object returned
by watch()
. See the link to the ChangeStreamIterable
API
documentation at the bottom of this example for more details on the
available methods.
Example
This example shows how to open a change stream on the
myColl
collection and print change stream events as they occur.
The driver stores change stream events in a variable of type
ChangeStreamIterable
. In the following example, we specify that the
driver should populate the ChangeStreamIterable
object with Document
types. As a result, the driver stores individual change stream events as
ChangeStreamDocument
objects.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
An insert operation on the collection produces the following output:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Watch Example: Full Files
This example demonstrates how to open a change stream by using the watch method.
The Watch.java
file calls the watch()
method with a pipeline as an
argument to filter for only "insert"
and "update"
events. The
WatchCompanion.java
file inserts, updates and deletes a document.
To use the following examples, run the files in this order:
Run the
Watch.java
file.Run the
WatchCompanion.java
file.
Note
The Watch.java
file will continue running until the
WatchCompanion.java
file is run.
Watch.java
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java
:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
Full File Example Output
The preceding applications will generate the following output:
Watch.java
will capture only the insert
and update
operations, since the aggregation pipeline filters out the delete
operation:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion
will print a summary of the operations it completed:
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
To learn more about the watch()
method, see the following API
documentation:
Apply Aggregation Operators to your Change Stream
You can pass an aggregation pipeline as a parameter to the watch()
method
to specify which change events the change stream receives.
To learn which aggregation operators your MongoDB Server version supports, see Modify Change Stream Output.
Example
The following code example shows how you can apply an aggregation pipeline to configure your change stream to receive change events for only insert and update operations:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
An update operation on the collection produces the following output:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Split Large Change Stream Events
Starting in MongoDB 7.0, you can use the $changeStreamSplitLargeEvent
aggregation stage to split events that exceed 16 MB into smaller fragments.
Use $changeStreamSplitLargeEvent
only when strictly necessary. For
example, use $changeStreamSplitLargeEvent
if your application requires full
document pre- or post-images, and generates events that exceed 16 MB.
The $changeStreamSplitLargeEvent stage returns the fragments sequentially. You can
access the fragments by using a change stream cursor. Each fragment
includes a SplitEvent
object containing the following fields:
Field | Description |
---|---|
| The index of the fragment, starting at |
| The total number of fragments that compose the split event |
The following example modifies your change stream by using the
$changeStreamSplitLargeEvent
aggregation stage to split large events:
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
Note
You can have only one $changeStreamSplitLargeEvent
stage in your
aggregation pipeline, and it must be the last stage in the pipeline.
You can call the getSplitEvent()
method on your change stream cursor to access
the SplitEvent
as shown in the following example:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
For more information about the $changeStreamSplitLargeEvent
aggregation stage,
see the $changeStreamSplitLargeEvent server documentation.
Include Pre-images and Post-images
You can configure the change event to contain or omit the following data:
The pre-image, a document that represents the version of the document before the operation, if it exists
The post-image, a document that represents the version of the document after the operation, if it exists
Important
You can enable pre- and post-images on collections only if your deployment uses MongoDB v6.0 or later.
To receive change stream events that include a pre-image or post-image, you must perform the following actions:
Enable pre-images and post-images for the collection on your MongoDB deployment.
Tip
To learn how to enable pre- and post-images on your deployment, see Change Streams with Document Pre- and Post-Images in the Server manual.
To learn how to instruct the driver to create a collection with pre-images and post-images enabled, see the Create a Collection with Pre-Image and Post-Images Enabled section.
Configure your change stream to retrieve either or both the pre-images and post-images.
Tip
To configure your change stream to record the pre-image in change events, see the Pre-image Configuration Example.
To configure your change stream to record the post-image in change events, see the Post-image Configuration Example.
Create a Collection with Pre-Image and Post-Images Enabled
To use the driver to create a collection with the pre-image and post-image options
enabled, specify an instance of ChangeStreamPreAndPostImagesOptions
and call the createCollection()
method as shown in the following example:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
You can change the pre-image and post-image option in an existing collection
by running the collMod
command from the MongoDB Shell. To learn how to
perform this operation, see the entry on collMod
in the Server manual.
Warning
If you enabled pre-images or post-images on a collection, modifying
these settings with collMod
can cause existing change streams on
that collection to fail.
Pre-image Configuration Example
The following code example shows how you can configure a change stream
on the myColl
collection to include the pre-image and output any
change events:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
The preceding example configures the change stream to use the
FullDocumentBeforeChange.REQUIRED
option. This option configures the change
stream to require pre-images for replace, update, and delete change
events. If the pre-image is not available, the driver raises an error.
Suppose you update the value of the amount
field in a document from
150
to 2000
. This change event produces the following output:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
For a list of options, see the FullDocumentBeforeChange API documentation.
Post-image Configuration Example
The following code example shows how you can configure a change stream
on the myColl
collection to include the pre-image and output any
change events:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
The preceding example configures the change stream to use the
FullDocument.WHEN_AVAILABLE
option. This option configures the change
stream to return the post-image of the modified document for replace and
update change events, if it's available.
Suppose you update the value of the color
field in a document from
"purple"
to "pink"
. The change event produces the following
output:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
For a list of options, see the FullDocument API documentation.
Additional Information
API Documentation
For more information about the methods and classes used to manage change streams, see the following API documentation: