Docs 菜单
Docs 主页
/ / /
Node.js
/

注意更改

可以在以下对象上使用 watch() 方法来监视 MongoDB 中的更改:

  • Collection

  • 数据库

  • MongoClient

对于每个对象,watch() 方法都会打开一个更改流,以便在更改事件发生时发出更改事件文档。

watch() 方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数。聚合阶段筛选和转换变更事件。

在以下代码段中,$match 阶段匹配 runtime 值小于 15 的所有更改事件文档,并过滤掉所有其他文档。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

watch() 方法将 options 对象作为第二个参数。有关此对象配置设置的更多信息,请参阅本节末尾的链接。

watch()方法返回 ChangeStream 的实例 。您可以通过迭代变更流或监听事件来读取变更流中的事件。

根据您想从变更流中读取事件的方式选择相应的标签页:

从版本 4.12 开始,ChangeStream 对象是异步可遍历对象。通过此更改,您可以使用 for-await 循环从打开的变更流中检索事件:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

您可以调用ChangeStream对象上的方法,例如:

  • hasNext() 检查流中是否有剩余文档

  • next() 请求流中的下一个文档

  • close() 关闭 ChangeStream

您可以通过调用on()方法将侦听器函数附加到ChangeStream对象。此方法继承自 Javascript EventEmitter类。传递字符串"change"作为第一个参数,并将侦听器函数作为第二个参数,如下所示:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

监听器函数在发出change事件时触发。您可以在侦听器中指定逻辑,以便在收到更改事件文档时对其进行处理。

您可以通过调用pause()停止发出事件或调用resume()继续发出事件来控制变更流。

要停止处理更改事件,请调用 close() ChangeStream实例上的方法。这将关闭变更流并释放资源。

changeStream.close();

警告

驱动程序不支持在 EventEmitterIterator 模式下同时使用 ChangeStream,并会导致错误。这是为了防止未定义的行为,在这种行为中,驱动程序无法保证哪个使用者先接收文档。

以下示例在 insertDB 数据库中的 haikus 集合上打开一个变更流,并在发生变更事件时打印变更事件:

注意

您可以使用此示例连接到 MongoDB 实例,并与包含样本数据的数据库进行交互。要了解有关连接到 MongoDB 实例和加载样本数据集的更多信息,请参阅使用示例指南。

1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8let changeStream;
9async function run() {
10 try {
11 const database = client.db("insertDB");
12 const haikus = database.collection("haikus");
13
14 // Open a Change Stream on the "haikus" collection
15 changeStream = haikus.watch();
16
17 // Print change events
18 for await (const change of changeStream) {
19 console.log("Received change:\n", change);
20 }
21
22 await changeStream.close();
23
24 } finally {
25 await client.close();
26 }
27}
28run().catch(console.dir);
1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8let changeStream;
9async function run() {
10 try {
11 const database = client.db("insertDB");
12 const haikus = database.collection("haikus");
13
14 // Open a Change Stream on the "haikus" collection
15 changeStream = haikus.watch();
16
17 // Print change events
18 for await (const change of changeStream) {
19 console.log("Received change:\n", change);
20 }
21
22 await changeStream.close();
23
24 } finally {
25 await client.close();
26 }
27}
28run().catch(console.dir);

注意

相同的代码片段

上述 JavaScript 和 TypeScript 代码片段完全相同。驱动程序没有与此使用案例相关的特定于 TypeScript 的功能。

当您运行此代码,然后对 haikus 集合进行更改(例如执行插入或删除操作)时,您可以在终端中看到打印的变更事件文档。

例如,如果将一个文档插入到集合中,上述代码将打印以下输出:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

注意

从更新接收完整文档

默认情况下,包含更新操作信息的更改事件仅返回修改后的字段,而不是完整的更新文档。您可以将更改流配置为也返回文档的最新版本,方法是将选项对象的 fullDocument 字段设置为 "updateLookup",如下所示:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

以下示例在 insertDB 数据库中的 haikus 集合上打开一个变更流。让我们创建一个监听函数来接收和打印集合上发生的变更事件。

首先,打开集合上的变更流,然后使用 on() 方法在变更流上定义侦听器。设置侦听器后,通过对集合执行更改来生成变更事件。

要在集合上生成更改事件,我们使用 insertOne() 方法添加新文档。由于 insertOne() 可能会在监听函数注册之前运行,因此我们使用定义为 simulateAsyncPause 的计时器,在插入之前等待 1 秒。

我们还在插入文档后使用 simulateAsyncPause。这为侦听器函数提供充足的时间来接收变更事件,并让侦听器在使用 close() 方法关闭 ChangeStream 实例之前完成自身的执行。

注意

包含计时器的原因

此示例中使用的计时器仅用于演示。它们可以确保有足够的时间来注册监听器并让监听器能在退出之前处理变更事件。

1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8const simulateAsyncPause = () =>
9 new Promise(resolve => {
10 setTimeout(() => resolve(), 1000);
11 });
12
13let changeStream;
14async function run() {
15 try {
16 const database = client.db("insertDB");
17 const haikus = database.collection("haikus");
18
19 // open a Change Stream on the "haikus" collection
20 changeStream = haikus.watch();
21
22 // set up a listener when change events are emitted
23 changeStream.on("change", next => {
24 // process any change event
25 console.log("received a change to the collection: \t", next);
26 });
27
28 await simulateAsyncPause();
29
30 await myColl.insertOne({
31 title: "Record of a Shriveled Datum",
32 content: "No bytes, no problem. Just insert a document, in MongoDB",
33 });
34
35 await simulateAsyncPause();
36
37 await changeStream.close();
38
39 console.log("closed the change stream");
40 } finally {
41 await client.close();
42 }
43}
44run().catch(console.dir);
1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8const simulateAsyncPause = () =>
9 new Promise(resolve => {
10 setTimeout(() => resolve(), 1000);
11 });
12
13let changeStream;
14async function run() {
15 try {
16 const database = client.db("insertDB");
17 const haikus = database.collection("haikus");
18
19 // open a Change Stream on the "haikus" collection
20 changeStream = haikus.watch();
21
22 // set up a listener when change events are emitted
23 changeStream.on("change", next => {
24 // process any change event
25 console.log("received a change to the collection: \t", next);
26 });
27
28 await simulateAsyncPause();
29
30 await myColl.insertOne({
31 title: "Record of a Shriveled Datum",
32 content: "No bytes, no problem. Just insert a document, in MongoDB",
33 });
34
35 await simulateAsyncPause();
36
37 await changeStream.close();
38
39 console.log("closed the change stream");
40 } finally {
41 await client.close();
42 }
43}
44run().catch(console.dir);

注意

相同的代码片段

上述 JavaScript 和 TypeScript 代码片段完全相同。驱动程序没有与此使用案例相关的特定于 TypeScript 的功能。

请访问以下资源以获取有关本页提到的类和方法的更多信息:

后退

运行命令