Docs Menu
Docs Home
/ / /
Node.js
/

変更の監視

次のオブジェクトの watch()メソッドを使用して、MongoDB の変更を監視できます。

各オブジェクトに対して、 watch()メソッドは変更ストリームを開き、変更が発生したときに変更イベントドキュメントを発行します。

オプションとして、watch() メソッドは複数の集約ステージの配列で構成される集約パイプラインを最初のパラメーターとして取ることができます。集計ステージでは、変更イベントをフィルタリングして変換します。

次のスニペットでは、$match ステージで runtime の値が 15 未満のすべての変更イベントドキュメントが照合され、その他のドキュメントがすべて除外されます。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

watch()メソッドは、2 番目のパラメーターとしてoptionsオブジェクトを受け入れます。このオブジェクトで構成できる設定の詳細については、このセクションの最後にあるリンクを参照してください。

watch()メソッドは、 変更ストリーム のインスタンスを返します 。変更ストリームを反復処理したり、イベントをリスニングしたりすることで、変更ストリームからイベントを読み取ることができます。

変更ストリームからイベントを読み取る方法を示すタブを選択します。

バージョン 4.12 以降、 ChangeStreamオブジェクトは非同期反復可能オブジェクトになります。この変更により、 for-awaitループを使用して、開いている変更ストリームからイベントを取得できるようになります。

for await (const change of changeStream) {
console.log("Received change: ", change);
}

次のようなChangeStreamオブジェクト上のメソッドを呼び出すことができます。

  • hasNext() ストリーム内の残りのドキュメントを確認する

  • next() ストリーム内の次のドキュメントをリクエストする

  • close() ChangeStream を閉じる

on() メソッドを呼び出すことによって、ChangeStream オブジェクトに listner 関数をアタッチできます。このメソッドは、Javascript EventEmitter クラスから継承されます。以下のように、最初のパラメーターとして文字列"change" を渡し、2 番目のパラメーターとして listner 関数を渡します。

changeStream.on("change", (changeEvent) => { /* your listener function */ });

リスナー関数は、 changeイベントが発生したときにトリガーされます。変更イベント ドキュメントを受信時に処理するよう、リスナー内でロジックを指定できます。

pause()を呼び出してイベントの発行を停止するか、 resume()呼び出してイベントの発行を継続することで、変更ストリームを制御できます。

変更イベントの処理を停止するには、ChangeStreamインスタンスで close() メソッドを呼び出します。これにより、変更ストリームが閉じられ、リソースが解放されます。

changeStream.close();

警告

EventEmitter モードと Iterator モードでのChangeStreamの使用は、ドライバーでサポートされていないため、エラーが発生します。これは、ドキュメントを最初に受け取るコンシューマーをドライバー側で保証できない場合の未定義の動作を防ぐためのものです。

次の例ではinsertDBデータベースのhaikusコレクションの変更ストリームがオープンされ、変更イベントが発生に応じて出力されます。

注意

この例を使用して、MongoDB のインスタンスに接続し、サンプルデータを含むデータベースと交流できます。MongoDB インスタンスへの接続とサンプルデータセットの読み込みの詳細については、 使用例ガイドを参照してください。

1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8let changeStream;
9async function run() {
10 try {
11 const database = client.db("insertDB");
12 const haikus = database.collection("haikus");
13
14 // Open a Change Stream on the "haikus" collection
15 changeStream = haikus.watch();
16
17 // Print change events
18 for await (const change of changeStream) {
19 console.log("Received change:\n", change);
20 }
21
22 await changeStream.close();
23
24 } finally {
25 await client.close();
26 }
27}
28run().catch(console.dir);
1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8let changeStream;
9async function run() {
10 try {
11 const database = client.db("insertDB");
12 const haikus = database.collection("haikus");
13
14 // Open a Change Stream on the "haikus" collection
15 changeStream = haikus.watch();
16
17 // Print change events
18 for await (const change of changeStream) {
19 console.log("Received change:\n", change);
20 }
21
22 await changeStream.close();
23
24 } finally {
25 await client.close();
26 }
27}
28run().catch(console.dir);

注意

同一のコードスニペット

上記の JavaScript と TypeScript のコード スニペットは同一です。このユースケースに関連するドライバーの TypeScript 固有の機能はありません。

このコードを実行してから、挿入または削除操作の実行など、 haikusコレクションに変更を加えると、変更イベント ドキュメントがターミナルに出力されます。

たとえば、コレクションにドキュメントを挿入すると、コードによって次の内容が出力されます。

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

注意

更新から完全なドキュメントを受け取る

アップデート操作に関する情報を含む変更イベントは、デフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。次のように、オプションオブジェクトの fullDocument フィールドを "updateLookup" に設定することで、ドキュメントの最新バージョンも返すように変更ストリームを設定できます。

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

次の例では、 insertDBデータベースのhaikusコレクションの変更ストリームを開きます。コレクションで発生する変更イベントを受信して出力するリスナー関数を作成します。

まず、コレクションで変更ストリームを開き、on() メソッドを使用して、変更ストリーム上でリスナーを定義します。リスナーを設定したら、コレクションに対する変更を実行して変更イベントを生成します。

コレクション上で変更イベントを生成するために、insertOne() メソッドを使用して新しいドキュメントを追加します。insertOne() がリスナー関数の登録より先に実行される可能性があるため、simulateAsyncPause と定義したタイマーを使用し、1 秒間待ってから挿入を実行します。

また、ドキュメント挿入後に simulateAsyncPause も使用します。これにより、close() メソッドによって ChangeStream インスタンスが閉じる前に、リスナー関数が変更イベントを受け取って処理を完了するための十分な時間を確保できます。

注意

タイマーを使用する理由

この例で使用されているタイマーは、あくまでもデモンストレーションのためのものです。これにより、リスナーの登録と、終了前のリスナーによる変更イベントの処理に十分な時間を確保できます。

1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8const simulateAsyncPause = () =>
9 new Promise(resolve => {
10 setTimeout(() => resolve(), 1000);
11 });
12
13let changeStream;
14async function run() {
15 try {
16 const database = client.db("insertDB");
17 const haikus = database.collection("haikus");
18
19 // open a Change Stream on the "haikus" collection
20 changeStream = haikus.watch();
21
22 // set up a listener when change events are emitted
23 changeStream.on("change", next => {
24 // process any change event
25 console.log("received a change to the collection: \t", next);
26 });
27
28 await simulateAsyncPause();
29
30 await myColl.insertOne({
31 title: "Record of a Shriveled Datum",
32 content: "No bytes, no problem. Just insert a document, in MongoDB",
33 });
34
35 await simulateAsyncPause();
36
37 await changeStream.close();
38
39 console.log("closed the change stream");
40 } finally {
41 await client.close();
42 }
43}
44run().catch(console.dir);
1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8const simulateAsyncPause = () =>
9 new Promise(resolve => {
10 setTimeout(() => resolve(), 1000);
11 });
12
13let changeStream;
14async function run() {
15 try {
16 const database = client.db("insertDB");
17 const haikus = database.collection("haikus");
18
19 // open a Change Stream on the "haikus" collection
20 changeStream = haikus.watch();
21
22 // set up a listener when change events are emitted
23 changeStream.on("change", next => {
24 // process any change event
25 console.log("received a change to the collection: \t", next);
26 });
27
28 await simulateAsyncPause();
29
30 await myColl.insertOne({
31 title: "Record of a Shriveled Datum",
32 content: "No bytes, no problem. Just insert a document, in MongoDB",
33 });
34
35 await simulateAsyncPause();
36
37 await changeStream.close();
38
39 console.log("closed the change stream");
40 } finally {
41 await client.close();
42 }
43}
44run().catch(console.dir);

注意

同一のコードスニペット

上記の JavaScript と TypeScript のコード スニペットは同一です。このユースケースに関連するドライバーの TypeScript 固有の機能はありません。

このページで言及されているクラスとメソッドに関する追加資料については、次のリソースを参照してください。

戻る

コマンドの実行