Docs Menu
Docs Home
/ / /
Node.js Driver
/

Watch for Changes

You can watch for changes in MongoDB using the watch() method on the following objects:

  • Collection

  • Database

  • MongoClient

For each object, the watch() method opens a change stream to emit change event documents when they occur.

The watch() method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter. The aggregation stages filter and transform the change events.

In the following snippet, the $match stage matches all change event documents with a runtime value of less than 15, filtering all others out.

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

The watch() method accepts an options object as the second parameter. Refer to the links at the end of this section for more information on the settings you can configure with this object.

The watch() method returns an instance of a ChangeStream. You can read events from change streams by iterating over them or listening for events.

Select the tab that corresponds to the way you want to read events from the change stream:

Starting in version 4.12, ChangeStream objects are async iterables. With this change, you can use for-await loops to retrieve events from an open change stream:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

You can call methods on the ChangeStream object such as:

  • hasNext() to check for remaining documents in the stream

  • next() to request the next document in the stream

  • close() to close the ChangeStream

You can attach listener functions to the ChangeStream object by calling the on() method. This method is inherited from the Javascript EventEmitter class. Pass the string "change" as the first parameter and your listener function as the second parameter as shown below:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

The listener function triggers when a change event is emitted. You can specify logic in the listener to process the change event document when it is received.

You can control the change stream by calling pause() to stop emitting events or resume() to continue to emit events.

To stop processing change events, call the close() method on the ChangeStream instance. This closes the change stream and frees resources.

changeStream.close();

Warning

Using a ChangeStream in EventEmitter and Iterator mode concurrently is not supported by the driver and causes an error. This is to prevent undefined behavior, where the driver cannot guarantee which consumer receives documents first.

The following example opens a change stream on the haikus collection in the insertDB database and prints change events as they occur:

Note

You can use this example to connect to an instance of MongoDB and interact with a database that contains sample data. To learn more about connecting to your MongoDB instance and loading a sample dataset, see the Usage Examples guide.

1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);
1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);

Note

Identical Code Snippets

The JavaScript and TypeScript code snippets above are identical. There are no TypeScript specific features of the driver relevant to this use case.

When you run this code and then make a change to the haikus collection, such as performing an insert or delete operation, you can see the change event document printed in your terminal.

For example, if you insert a document to the collection, the code prints the following output:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

Note

Receive Full Documents From Updates

Change events that contain information on update operations only return the modified fields by default rather than the full updated document. You can configure your change stream to also return the most current version of the document by setting the fullDocument field of the options object to "updateLookup" as follows:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

The following example opens a change stream on the haikus collection in the insertDB database. Let's create a listener function to receive and print change events that occur on the collection.

First, open the change stream on the collection and then define a listener on the change stream using the on() method. Once you set the listener, generate a change event by performing a change to the collection.

To generate the change event on the collection, let's use the insertOne() method to add a new document. Since insertOne() may run before the listener function can register, we use a timer, defined as simulateAsyncPause to wait 1 second before executing the insert.

We also use simulateAsyncPause after the insertion of the document. This provides ample time for the listener function to receive the change event and for the listener to complete its execution before closing the ChangeStream instance using the close() method.

Note

Reason to include timers

The timers used in this example are only for demonstration purposes. They make sure that there is enough time to register the listener and have the listener process the change event before exiting.

1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);
1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);

Note

Identical Code Snippets

The JavaScript and TypeScript code snippets above are identical. There are no TypeScript specific features of the driver relevant to this use case.

Visit the following resources for more material on the classes and methods mentioned on this page:

Back

Run a Command