注意更改
打开变更流
可以在以下对象上使用 watch()
方法来监视 MongoDB 中的更改:
对于每个对象,watch()
方法都会打开一个更改流,以便在更改事件发生时发出更改事件文档。
watch()
方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数。聚合阶段筛选和转换变更事件。
在以下代码段中,$match
阶段匹配 runtime
值小于 15 的所有更改事件文档,并过滤掉所有其他文档。
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
watch()
方法将 options
对象作为第二个参数。有关此对象配置设置的更多信息,请参阅本节末尾的链接。
watch()
方法返回 ChangeStream 的实例。您可以通过迭代变更流或监听事件,读取变更流中的事件。
根据您想从变更流中读取事件的方式选择相应的标签页:
从版本 4.12 开始,ChangeStream
对象是异步可遍历对象。通过此更改,您可以使用 for-await
循环从打开的变更流中检索事件:
for await (const change of changeStream) { console.log("Received change: ", change); }
您可以调用ChangeStream
对象上的方法,例如:
hasNext()
检查流中是否有剩余文档next()
请求流中的下一个文档close()
关闭 ChangeStream
您可以通过调用 on()
方法将监听函数附加到 ChangeStream
对象。此方法继承自 Javascript EventEmitter
类。如下所示,将 string "change"
作为第一个参数传递,将监听函数作为第二个参数传递:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
监听器函数在发出change
事件时触发。 您可以在侦听器中指定逻辑,以便在收到更改事件文档时对其进行进程。
您可以通过调用 pause()
停止触发事件或调用 resume()
继续触发事件来控制变更流。
要停止处理更改事件,请调用 close() ChangeStream
实例上的方法。这将关闭变更流并释放资源。
changeStream.close();
警告
驱动程序不支持在 EventEmitter
和 Iterator
模式下同时使用 ChangeStream
,并会导致错误。这是为了防止未定义的行为,在这种行为中,驱动程序无法保证哪个使用者先接收文档。
示例
迭代
以下示例在 insertDB
数据库中的 haikus
集合上打开一个变更流,并在发生变更事件时打印变更事件:
注意
可以使用此示例连接到 MongoDB 实例,并与包含样本数据的数据库进行交互。如需了解有关连接到 MongoDB 实例和加载示例数据集的更多信息,请参阅 使用示例指南 。
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 let changeStream; 9 async function run() { 10 try { 11 const database = client.db("insertDB"); 12 const haikus = database.collection("haikus"); 13 14 // Open a Change Stream on the "haikus" collection 15 changeStream = haikus.watch(); 16 17 // Print change events 18 for await (const change of changeStream) { 19 console.log("Received change:\n", change); 20 } 21 22 await changeStream.close(); 23 24 } finally { 25 await client.close(); 26 } 27 } 28 run().catch(console.dir);
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 let changeStream; 9 async function run() { 10 try { 11 const database = client.db("insertDB"); 12 const haikus = database.collection("haikus"); 13 14 // Open a Change Stream on the "haikus" collection 15 changeStream = haikus.watch(); 16 17 // Print change events 18 for await (const change of changeStream) { 19 console.log("Received change:\n", change); 20 } 21 22 await changeStream.close(); 23 24 } finally { 25 await client.close(); 26 } 27 } 28 run().catch(console.dir);
注意
相同的代码片段
上述 JavaScript 和 TypeScript 代码片段完全相同。驱动程序没有与此使用案例相关的特定于 TypeScript 的功能。
当您运行此代码,然后对 haikus
集合进行更改(例如执行插入或删除操作)时,您可以在终端中看到打印的变更事件文档。
例如,如果将一个文档插入到集合中,上述代码将打印以下输出:
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
注意
从更新接收完整文档
默认情况下,包含更新操作信息的更改事件仅返回修改后的字段,而不是完整的更新文档。您可以将更改流配置为也返回文档的最新版本,方法是将选项对象的 fullDocument
字段设置为 "updateLookup"
,如下所示:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
监听函数
以下示例在 insertDB
数据库中的 haikus
集合上打开一个变更流。让我们创建一个监听函数来接收和打印集合上发生的变更事件。
首先,打开集合上的变更流,然后使用 on()
方法在变更流上定义侦听器。设置侦听器后,通过对集合执行更改来生成变更事件。
要在集合上生成更改事件,我们使用 insertOne()
方法添加新文档。由于 insertOne()
可能会在监听函数注册之前运行,因此我们使用定义为 simulateAsyncPause
的计时器,在插入之前等待 1 秒。
我们还在插入文档后使用 simulateAsyncPause
。这为侦听器函数提供充足的时间来接收变更事件,并让侦听器在使用 close()
方法关闭 ChangeStream
实例之前完成自身的执行。
注意
包含计时器的原因
此示例中使用的计时器仅用于演示。它们可以确保有足够的时间来注册监听器并让监听器能在退出之前处理变更事件。
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 const simulateAsyncPause = () => 9 new Promise(resolve => { 10 setTimeout(() => resolve(), 1000); 11 }); 12 13 let changeStream; 14 async function run() { 15 try { 16 const database = client.db("insertDB"); 17 const haikus = database.collection("haikus"); 18 19 // open a Change Stream on the "haikus" collection 20 changeStream = haikus.watch(); 21 22 // set up a listener when change events are emitted 23 changeStream.on("change", next => { 24 // process any change event 25 console.log("received a change to the collection: \t", next); 26 }); 27 28 await simulateAsyncPause(); 29 30 await myColl.insertOne({ 31 title: "Record of a Shriveled Datum", 32 content: "No bytes, no problem. Just insert a document, in MongoDB", 33 }); 34 35 await simulateAsyncPause(); 36 37 await changeStream.close(); 38 39 console.log("closed the change stream"); 40 } finally { 41 await client.close(); 42 } 43 } 44 run().catch(console.dir);
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 const simulateAsyncPause = () => 9 new Promise(resolve => { 10 setTimeout(() => resolve(), 1000); 11 }); 12 13 let changeStream; 14 async function run() { 15 try { 16 const database = client.db("insertDB"); 17 const haikus = database.collection("haikus"); 18 19 // open a Change Stream on the "haikus" collection 20 changeStream = haikus.watch(); 21 22 // set up a listener when change events are emitted 23 changeStream.on("change", next => { 24 // process any change event 25 console.log("received a change to the collection: \t", next); 26 }); 27 28 await simulateAsyncPause(); 29 30 await myColl.insertOne({ 31 title: "Record of a Shriveled Datum", 32 content: "No bytes, no problem. Just insert a document, in MongoDB", 33 }); 34 35 await simulateAsyncPause(); 36 37 await changeStream.close(); 38 39 console.log("closed the change stream"); 40 } finally { 41 await client.close(); 42 } 43 } 44 run().catch(console.dir);
注意
相同的代码片段
上述 JavaScript 和 TypeScript 代码片段完全相同。驱动程序没有与此使用案例相关的特定于 TypeScript 的功能。
请访问以下资源以获取有关本页提到的类和方法的更多信息: