Watch for Changes
Open a Change Stream
You can watch for changes in MongoDB using the watch()
method on the
following objects:
For each object, the watch()
method opens a change stream to
emit change event documents when they occur.
The watch()
method optionally takes an aggregation pipeline which consists of an array of aggregation stages
as the first parameter. The aggregation stages filter and transform the change events.
In the following snippet, the $match
stage matches all change event documents with a runtime
value of less than
15, filtering all others out.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
The watch()
method accepts an options
object as the second parameter. Refer to the links at the end of this
section for more information on the settings you can configure with this object.
The watch()
method returns an instance of a ChangeStream. You can read events from
change streams by iterating over them or listening for events.
Select the tab that corresponds to the way you want to read events from the change stream:
Starting in version 4.12, ChangeStream
objects are async
iterables. With this change, you can use for-await
loops to
retrieve events from an open change stream:
for await (const change of changeStream) { console.log("Received change: ", change); }
You can call methods on the ChangeStream
object such as:
hasNext()
to check for remaining documents in the streamnext()
to request the next document in the streamclose()
to close the ChangeStream
You can attach listener functions to the ChangeStream
object
by calling the on()
method. This method is inherited from the
Javascript EventEmitter
class. Pass the string "change"
as
the first parameter and your listener function as the second parameter as shown below:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
The listener function triggers when a change
event is emitted. You
can specify logic in the listener to process the change event document
when it is received.
You can control the change stream by calling pause()
to stop emitting events or resume()
to continue to emit events.
To stop processing change events, call the close() method on the
ChangeStream
instance. This closes the change stream and frees resources.
changeStream.close();
Warning
Using a ChangeStream
in EventEmitter
and Iterator
mode
concurrently is not supported by the driver and causes an error. This
is to prevent undefined behavior, where the driver cannot guarantee
which consumer receives documents first.
Examples
Iteration
The following example opens a change stream on the haikus
collection in
the insertDB
database and prints change events as they occur:
Note
You can use this example to connect to an instance of MongoDB and interact with a database that contains sample data. To learn more about connecting to your MongoDB instance and loading a sample dataset, see the Usage Examples guide.
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 let changeStream; 9 async function run() { 10 try { 11 const database = client.db("insertDB"); 12 const haikus = database.collection("haikus"); 13 14 // Open a Change Stream on the "haikus" collection 15 changeStream = haikus.watch(); 16 17 // Print change events 18 for await (const change of changeStream) { 19 console.log("Received change:\n", change); 20 } 21 22 await changeStream.close(); 23 24 } finally { 25 await client.close(); 26 } 27 } 28 run().catch(console.dir);
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 let changeStream; 9 async function run() { 10 try { 11 const database = client.db("insertDB"); 12 const haikus = database.collection("haikus"); 13 14 // Open a Change Stream on the "haikus" collection 15 changeStream = haikus.watch(); 16 17 // Print change events 18 for await (const change of changeStream) { 19 console.log("Received change:\n", change); 20 } 21 22 await changeStream.close(); 23 24 } finally { 25 await client.close(); 26 } 27 } 28 run().catch(console.dir);
Note
Identical Code Snippets
The JavaScript and TypeScript code snippets above are identical. There are no TypeScript specific features of the driver relevant to this use case.
When you run this code and then make a change to the haikus
collection, such as performing an insert or delete operation, you can
see the change event document printed in your terminal.
For example, if you insert a document to the collection, the code prints the following output:
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
Note
Receive Full Documents From Updates
Change events that contain information on update operations only return the modified
fields by default rather than the full updated document. You can configure
your change stream to also return the most current version of the document
by setting the fullDocument
field of the options object to
"updateLookup"
as follows:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
Listener Function
The following example opens a change stream on the haikus
collection in
the insertDB
database. Let's create a listener function to receive and
print change events that occur on the collection.
First, open the change stream on the collection and then define a listener
on the change stream using the on()
method. Once you set the
listener, generate a change event by performing a change to the collection.
To generate the change event on the collection, let's use the insertOne()
method to add a new document. Since insertOne()
may run before the
listener function can register, we use a timer, defined as
simulateAsyncPause
to wait 1 second before executing the insert.
We also use simulateAsyncPause
after the insertion of the document.
This provides ample time for the listener function to receive the change
event and for the listener to complete its execution before
closing the ChangeStream
instance using the close()
method.
Note
Reason to include timers
The timers used in this example are only for demonstration purposes. They make sure that there is enough time to register the listener and have the listener process the change event before exiting.
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 const simulateAsyncPause = () => 9 new Promise(resolve => { 10 setTimeout(() => resolve(), 1000); 11 }); 12 13 let changeStream; 14 async function run() { 15 try { 16 const database = client.db("insertDB"); 17 const haikus = database.collection("haikus"); 18 19 // open a Change Stream on the "haikus" collection 20 changeStream = haikus.watch(); 21 22 // set up a listener when change events are emitted 23 changeStream.on("change", next => { 24 // process any change event 25 console.log("received a change to the collection: \t", next); 26 }); 27 28 await simulateAsyncPause(); 29 30 await myColl.insertOne({ 31 title: "Record of a Shriveled Datum", 32 content: "No bytes, no problem. Just insert a document, in MongoDB", 33 }); 34 35 await simulateAsyncPause(); 36 37 await changeStream.close(); 38 39 console.log("closed the change stream"); 40 } finally { 41 await client.close(); 42 } 43 } 44 run().catch(console.dir);
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 const simulateAsyncPause = () => 9 new Promise(resolve => { 10 setTimeout(() => resolve(), 1000); 11 }); 12 13 let changeStream; 14 async function run() { 15 try { 16 const database = client.db("insertDB"); 17 const haikus = database.collection("haikus"); 18 19 // open a Change Stream on the "haikus" collection 20 changeStream = haikus.watch(); 21 22 // set up a listener when change events are emitted 23 changeStream.on("change", next => { 24 // process any change event 25 console.log("received a change to the collection: \t", next); 26 }); 27 28 await simulateAsyncPause(); 29 30 await myColl.insertOne({ 31 title: "Record of a Shriveled Datum", 32 content: "No bytes, no problem. Just insert a document, in MongoDB", 33 }); 34 35 await simulateAsyncPause(); 36 37 await changeStream.close(); 38 39 console.log("closed the change stream"); 40 } finally { 41 await client.close(); 42 } 43 } 44 run().catch(console.dir);
Note
Identical Code Snippets
The JavaScript and TypeScript code snippets above are identical. There are no TypeScript specific features of the driver relevant to this use case.
Visit the following resources for additional material on the classes and methods mentioned on this page: