Docs Home → Develop Applications → MongoDB Drivers → Node.js
Watch for Changes
Open a Change Stream
You can watch for changes in MongoDB using the watch()
method on the
following objects:
For each object, the watch()
method opens a change stream to
emit change event documents when they occur.
The watch()
method optionally takes an aggregation pipeline which consists of an array of aggregation stages
as the first parameter. The aggregation stages filter and transform the change events.
In the following snippet, the $match
stage matches all change event documents with a runtime
value of less than
15, filtering all others out.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = collection.watch(pipeline);
The watch()
method accepts an options
object as the second parameter. Refer to the links at the end of this
section for more information on the settings you can configure with this object.
The watch()
method returns an instance of a ChangeStream. You can read events from
change streams by iterating over them or listening for events. Select the tab that corresponds to the way you want to
read events from the change stream below.
Warning
Avoid mixing both the iterative and event-based approach of reading from change streams as it can cause unexpected behavior.
Visit the following resources for additional material on the classes and methods presented above:
Example
The following example opens a change stream on the haikus
collection in
the insertDB
database. Let's create a listener function to receive and
print change events that occur on the collection.
First, open the change stream on the collection and then define a callback
on the change stream using the on()
method. Once set, generate a change
event by performing a change to the collection.
To generate the change event on the collection, let's use insertOne()
method to add a new document. Since the insertOne()
may run before the
listener function can register, we use a timer, defined as
simulateAsyncPause
to wait 1 second before executing the insert.
We also use simulateAsyncPause
after the insertion of the document
to provide ample time for the listener function to receive the change
event and for the callback to complete its execution before
closing the ChangeStream
instance using the close()
method.
The timers used in this example are only necessary for this demonstration to make sure there is enough time to register listener and have the callback process the event before exiting.
Note
This example connects to an instance of MongoDB and uses a sample data database. To learn more about connecting to your MongoDB instance and loading this database, see the Usage Examples guide.
const { MongoClient } = require("mongodb"); // Replace the uri string with your MongoDB deployment's connection string. const uri = "mongodb+srv://<user>:<password>@<cluster-url>?writeConcern=majority"; const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true, }); const simulateAsyncPause = () => new Promise(resolve => { setTimeout(() => resolve(), 1000); }); let changeStream; async function run() { try { await client.connect(); const database = client.db("insertDB"); const collection = database.collection("haikus"); // open a Change Stream on the "haikus" collection changeStream = collection.watch(); // set up a listener when change events are emitted changeStream.on("change", next => { // process any change event console.log("received a change to the collection: \t", next); }); await simulateAsyncPause(); await collection.insertOne({ title: "Record of a Shriveled Datum", content: "No bytes, no problem. Just insert a document, in MongoDB", }); await simulateAsyncPause(); await changeStream.close(); console.log("closed the change stream"); } finally { await client.close(); } } run().catch(console.dir);
If you run the example above, you should see output similar to the following:
received a change to the collection: { _id: { _data: '825EC...' }, operationType: 'insert', clusterTime: new Timestamp { ... }, fullDocument: { _id: new ObjectId(...), title: 'Record of a Shriveled Datum', content: 'No bytes, no problem. Just insert a document, in MongoDB' }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId(...) } }
Note
Receive Full Documents From Updates
Change events that contain information on update operations only return the modified
fields by default rather than the full updated document. You can configure
your change stream to also return the most current version of the document
by setting the fullDocument
field of the options object to
"updateLookup"
as follows:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = collection.watch(pipeline, options);