Docs Menu
Docs Home
/ / /
Node.js ドライバー
/

変更の監視

次のオブジェクトの watch()メソッドを使用して、MongoDB の変更を監視できます。

  • コレクション

  • Database

  • MongoClient

各オブジェクトに対して、 watch()メソッドは変更ストリームを開き、変更が発生したときに変更イベントドキュメントを発行します。

オプションとして、watch() メソッドは複数の集約ステージの配列で構成される集約パイプラインを最初のパラメーターとして取ることができます。集計ステージでは、変更イベントをフィルタリングして変換します。

次のスニペットでは、$match ステージで runtime の値が 15 未満のすべての変更イベントドキュメントが照合され、その他のドキュメントがすべて除外されます。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

watch()メソッドは、2 番目のパラメーターとしてoptionsオブジェクトを受け入れます。このオブジェクトで構成できる設定の詳細については、このセクションの最後にあるリンクを参照してください。

watch()メソッドは、 変更ストリーム のインスタンスを返します 。変更ストリームを反復処理したり、イベントをリスニングしたりすることで、変更ストリームからイベントを読み取ることができます。

変更ストリームからイベントを読み取る方法を示すタブを選択します。

バージョン 4.12 以降、 ChangeStreamオブジェクトは非同期反復可能オブジェクトになります。この変更により、 for-awaitループを使用して、開いている変更ストリームからイベントを取得できるようになります。

for await (const change of changeStream) {
console.log("Received change: ", change);
}

次のようなChangeStreamオブジェクト上のメソッドを呼び出すことができます。

  • hasNext() ストリーム内の残りのドキュメントを確認する

  • next() ストリーム内の次のドキュメントをリクエストする

  • close() ChangeStream を閉じる

on() メソッドを呼び出すことによって、ChangeStream オブジェクトに listner 関数をアタッチできます。このメソッドは、Javascript EventEmitter クラスから継承されます。以下のように、最初のパラメーターとして文字列"change" を渡し、2 番目のパラメーターとして listner 関数を渡します。

changeStream.on("change", (changeEvent) => { /* your listener function */ });

リスナー関数は、 changeイベントが発生したときにトリガーされます。変更イベント ドキュメントを受信時に処理するよう、リスナー内でロジックを指定できます。

pause()を呼び出してイベントの発行を停止するか、 resume()呼び出してイベントの発行を継続することで、変更ストリームを制御できます。

変更イベントの処理を停止するには、ChangeStreamインスタンスで close() メソッドを呼び出します。これにより、変更ストリームが閉じられ、リソースが解放されます。

changeStream.close();

警告

EventEmitter モードと Iterator モードでのChangeStreamの使用は、ドライバーでサポートされていないため、エラーが発生します。これは、ドキュメントを最初に受け取るコンシューマーをドライバー側で保証できない場合の未定義の動作を防ぐためのものです。

次の例ではinsertDBデータベースのhaikusコレクションの変更ストリームがオープンされ、変更イベントが発生に応じて出力されます。

注意

この例を使用して、MongoDB のインスタンスに接続し、サンプルデータを含むデータベースと交流できます。MongoDB インスタンスへの接続とサンプルデータセットの読み込みの詳細については、 使用例ガイドを参照してください。

1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);
1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);

注意

同一のコードスニペット

上記の JavaScript と TypeScript のコード スニペットは同一です。このユースケースに関連するドライバーの TypeScript 固有の機能はありません。

このコードを実行してから、挿入または削除操作の実行など、 haikusコレクションに変更を加えると、変更イベント ドキュメントがターミナルに出力されます。

たとえば、コレクションにドキュメントを挿入すると、コードによって次の内容が出力されます。

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

注意

更新から完全なドキュメントを受け取る

アップデート操作に関する情報を含む変更イベントは、デフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。次のように、オプションオブジェクトの fullDocument フィールドを "updateLookup" に設定することで、ドキュメントの最新バージョンも返すように変更ストリームを設定できます。

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

次の例では、 insertDBデータベースのhaikusコレクションの変更ストリームを開きます。コレクションで発生する変更イベントを受信して出力するリスナー関数を作成します。

まず、コレクションで変更ストリームを開き、on() メソッドを使用して、変更ストリーム上でリスナーを定義します。リスナーを設定したら、コレクションに対する変更を実行して変更イベントを生成します。

コレクション上で変更イベントを生成するために、insertOne() メソッドを使用して新しいドキュメントを追加します。insertOne() がリスナー関数の登録より先に実行される可能性があるため、simulateAsyncPause と定義したタイマーを使用し、1 秒間待ってから挿入を実行します。

また、ドキュメント挿入後に simulateAsyncPause も使用します。これにより、close() メソッドによって ChangeStream インスタンスが閉じる前に、リスナー関数が変更イベントを受け取って処理を完了するための十分な時間を確保できます。

注意

タイマーを使用する理由

この例で使用されているタイマーは、あくまでもデモンストレーションのためのものです。これにより、リスナーの登録と、終了前のリスナーによる変更イベントの処理に十分な時間を確保できます。

1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);
1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);

注意

同一のコードスニペット

上記の JavaScript と TypeScript のコード スニペットは同一です。このユースケースに関連するドライバーの TypeScript 固有の機能はありません。

このページで説明されているクラスとメソッドの詳細については、次のリソースを参照してください。

戻る

コマンドの実行