Docs Menu
Docs Home
/ / /
Java Sync Driver
/

Watch for Changes

You can keep track of changes to data in MongoDB, such as changes to a collection, database, or deployment, by opening a change stream. A change stream allows applications to watch for changes to data and react to them.

The change stream returns change event documents when changes occur. A change event contains information about the updated data.

Open a change stream by calling the watch() method on a MongoCollection, MongoDatabase, or MongoClient object as shown in the following code example:

ChangeStreamIterable<Document> changeStream = database.watch();

The watch() method optionally takes an aggregation pipeline which consists of an array of stages as the first parameter to filter and transform the change event output as follows:

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

The watch() method returns an instance of ChangeStreamIterable, a class that offers several methods to access, organize, and traverse the results. ChangeStreamIterable also inherits methods from its parent class, MongoIterable which implements the core Java interface Iterable.

You can call forEach() on the ChangeStreamIterable to handle events as they occur, or you can use the iterator() method which returns a MongoCursor instance that you can use to traverse the results.

You can call methods on the MongoCursor such as hasNext() to check whether additional results exist, next() to return the next document in the collection, or tryNext(), to immediately return either the next available element in the change stream or null. Unlike the MongoCursor returned by other queries, a MongoCursor associated with a change stream waits until a change event arrives before returning a result from next(). As a result, calls to next() using a change stream's MongoCursor never throw a java.util.NoSuchElementException.

To configure options for processing the documents returned from the change stream, use member methods of the ChangeStreamIterable object returned by watch(). See the link to the ChangeStreamIterable API documentation at the bottom of this example for more details on the available methods.

To capture events from a change stream, call the forEach() method with a callback function as shown below:

changeStream.forEach(event -> System.out.println("Change observed: " + event));

The callback function triggers when a change event is emitted. You can specify logic in the callback to process the event document when it is received.

Important

forEach() blocks the current thread

Calls to forEach() block the current thread as long as the corresponding change stream listens for events. If your program needs to continue executing other logic, such as processing requests or responding to user input, consider creating and listening to your change stream in a separate thread.

Note

For update operation change events, change streams only return the modified fields by default rather than the entire updated document. You can configure your change stream to also return the most current version of the document by calling the fullDocument() member method of the ChangeStreamIterable object with the value FullDocument.UPDATE_LOOKUP as follows:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

The following example uses two separate applications to demonstrate how to listen for changes using a change stream:

  • The first application, named Watch, opens a change stream on the movies collection in the sample_mflix database. Watch uses an aggregation pipeline to filter changes based on operationType so that it only receives insert and update events (deletes are excluded by omission). Watch uses a callback to receive and print the filtered change events that occur on the collection.

  • The second application, named WatchCompanion, inserts a single document into the movies collection in the sample_mflix database. Next, WatchCompanion updates the document with a new field value. Finally, WatchCompanion deletes the document.

First, run Watch to open the change stream on the collection and define a callback on the change stream using the forEach() method. While Watch is running, run WatchCompanion to generate change events by performing changes to the collection.

Note

This example connects to an instance of MongoDB using a connection URI. To learn more about connecting to your MongoDB instance, see the connection guide.

Watch:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package usage.examples;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

// Performs CRUD operations to generate change events when run with the Watch application
package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

If you run the preceding applications in sequence, you should see output from the Watch application that is similar to the following. Only the insert and update operations are printed, since the aggregation pipeline filters out the delete operation:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

You should also see output from the WatchCompanion application that is similar to the following:

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Tip

Legacy API

If you are using the legacy API, see our FAQ page to learn what changes you need to make to this code example.

For additional information on the classes and methods mentioned on this page, see the following resources:

  • Change Streams Server Manual Entry

  • Change Events Server Manual Entry

  • Aggregation Pipeline Server Manual Entry

  • Aggregation Stages Server Manual Entry

  • ChangeStreamIterable API Documentation

  • MongoCollection.watch() API Documentation

  • MongoDatabase.watch() API Documentation

  • MongoClient.watch() API Documentation

Back

Perform Bulk Operations