Mongo.watch()
定義
Mongo.watch( pipeline, options )
レプリカセットとシャーディングされたクラスターのみ
レプリカセットまたはシャーディングされたクラスターの変更ストリーム カーソルを開き、
admin
、local
、config
データベースを除き、データベース全体のsystem
以外のすべてのコレクションについてレポートします。 。Parameterタイプ説明pipeline
配列任意。次の集計ステージの1つ以上で構成される集計パイプライン:
変更イベント出力をフィルタリング/修正するためのパイプラインを指定します。
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
options
ドキュメントoptions
ドキュメントには、次のフィールドと値を含めることができます。フィールドタイプ説明resumeAfter
ドキュメント任意。 再開トークンで指定された操作の後に再開通知を試行するように
Mongo.watch()
に指示します。各変更ストリームのイベント ドキュメントには、
_id
フィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_id
フィールド全体を渡します。resumeAfter
は、startAfter
およびstartAtOperationTime
と排他関係にあります。startAfter
ドキュメント任意。 再開トークンで指定された操作の後に新しい変更ストリームの開始を試行するように
Mongo.watch()
に指示します。 無効化イベント 後に通知を再開できるようにします。各変更ストリームのイベント ドキュメントには、
_id
フィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_id
フィールド全体を渡します。startAfter
は、resumeAfter
およびstartAtOperationTime
と排他関係にあります。fullDocument
string任意。 デフォルトでは、アップデートされたドキュメント全体ではなく、アップデート操作によって変更されたフィールドのデルタが返され
Mongo.watch()
。fullDocument
を"updateLookup"
に設定すると、更新されたドキュメントの過半数がコミットした最新のバージョンを参照するようMongo.watch()
に指示します。Mongo.watch()
は、updateDescription
デルタに加えて、ドキュメント検索を含むfullDocument
フィールドを返します。batchSize
整数任意。MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数を指定します。
cursor.batchSize()
と同じ機能を持ちます。maxAwaitTimeMS
整数任意。空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。
デフォルトは
1000
ミリ秒です。collation
ドキュメント任意。 照合ドキュメントを渡し、変更ストリーム カーソルの照合順序を指定します。
省略した場合、デフォルトは
simple
バイナリ比較になります。startAtOperationTime
タイムスタンプ任意。変更ストリームの開始点。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。oplog の時間範囲を確認するには、
rs.printReplicationInfo()
を参照してください。startAtOperationTime
は、resumeAfter
およびstartAfter
と排他関係にあります。次の値を返します。 変更イベント ドキュメント上のカーソル。 変更イベント ドキュメントの例については、「変更イベント」を参照してください。
可用性
配置
Mongo.watch()
は、レプリカセットとシャーディングされたクラスターで利用できます。
レプリカセットの場合、データを保持している任意のメンバーに対して
Mongo.watch()
を発行できます。シャーディングされたクラスターの場合、 インスタンスで
Mongo.watch()
mongos
を発行する必要があります。
ストレージ エンジン
読み取り保証 (read concern)majority
サポート
MongoDB 4.2 以降では、"majority"
の読み取り保証 (read concern)のサポートに関係なく変更ストリームが利用可能になりました。つまり、変更ストリームを使用する際、読み取り保証 (read concern) のmajority
サポートを有効にする(デフォルト)か無効にするかを選択できます。
MongoDB 4.0 以前では、変更ストリームは、"majority"
読み取り保証(read concern)サポートが有効(デフォルト)の場合にのみ使用できます。
動作
Mongo.watch()
データを保持しているノードの大半に反映されたデータ変更についてのみ通知します。変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
再開可能性
MongoDBドライバーとは異なり、 mongosh
はエラー後に変更ストリーム カーソルの再開を自動的に試行しません。 MongoDB ドライバーは、特定のエラーが発生後に 変更ストリーム カーソルの自動的な再開を1 回試行します。
Mongo.watch()
は、oplog に保存されている情報を使用して変更イベントの説明を生成し、その操作に関連付けられた再開トークンを生成します。 resumeAfter
またはstartAfter
オプションに渡される再開トークンによって識別される操作がすでにoplogから削除されている場合、 Mongo.watch()
は変更ストリームを再開できません。
変更ストリームの再開の詳細については、「変更ストリームの再開」を参照してください。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfter
を使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じてしまい、閉じた変更ストリームのカーソルが完全に再開できなくなることがあります。
注意
再開トークン
再開トークンの_data
タイプは、MongoDB のバージョンによって異なります。また、変更ストリームの開始時または再開時に機能の互換性バージョン(fcv)が依存する場合があります(つまり、fcv 値の変更はすでに有効な再開トークンには影響しません)。開かれた変更ストリーム)の場合
MongoDB バージョン | 機能の互換性バージョン | 再開トークン _data タイプ |
---|---|---|
MongoDB 4.2 以降 | "4.2" または "4.0" | 16進数でエンコードされたstring ( v1 ) |
MongoDB 4.0.7 以降 | "4.0" または "3.6" | 16進数でエンコードされたstring ( v1 ) |
MongoDB 4.0.6 以前 | "4.0" | 16進数でエンコードされたstring ( v0 ) |
MongoDB 4.0.6 以前 | "3.6" | BinData |
MongoDB 3.6 | "3.6" | BinData |
16 進数でエンコードされたトークン
16 進数でエンコードされた string の再開トークンを使用すると、再開トークンを比較して並べ替えることができます。
FCV の値に関係なく、4.0 配置では、BinData 再開トークンまたは 16 進数string再開トークンのいずれかを使用して変更ストリームを再開できます。 そのため、4.0 配置では、3.6 配置のコレクションで開かれた変更ストリームからの再開トークンを使用できます。
MongoDB バージョンで導入された新しい再開トークン形式は、以前のバージョンの MongoDB では使用できません。
再開トークンのデコード
MongoDB に備わる 「スニペット」は、16 進数でエンコードされた再開トークンを解読する mongosh
の拡張機能です。
再開トークンをインストールして実行できますmongosh
{ からのスニペット :
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
再開トークン を実行することもできます mongosh
npm
システムに がインストールされている場合は、コマンドラインからの実行( を使用せずに)。
npx mongodb-resumetoken-decoder <RESUME TOKEN>
下記についての詳細は、参照先をご覧ください。
アップデート 操作の完全なドキュメント検索
デフォルトでは、変更ストリーム カーソルはアップデート操作における特定のフィールドの変更またはデルタを返します。また、変更されたドキュメントのうち、過半数がコミットした最新のバージョンを検索して返すように変更ストリームを構成できます。アップデートと検索の間に他の書き込み操作が行われた場合、返されるドキュメントがアップデート実行時のドキュメントと大幅に異なる可能性があります。
アップデート操作中に適用された変更の数と完全なドキュメントのサイズによっては、アップデート操作における変更イベント ドキュメントのサイズが BSON ドキュメントの制限である 16 MB を超えるリスクがあります。サイズが超過した場合、サーバーで変更ストリーム カーソルが閉じられ、エラーが返されます。
可用性
MongoDB 4.2 以降では、"majority"
の読み取り保証 (read concern)のサポートに関係なく変更ストリームが利用可能になりました。つまり、変更ストリームを使用する際、読み取り保証 (read concern) のmajority
サポートを有効にする(デフォルト)か無効にするかを選択できます。
MongoDB 4.0 以前では、変更ストリームは、"majority"
読み取り保証(read concern)サポートが有効(デフォルト)の場合にのみ使用できます。
アクセス制御
アクセス制御を使用して実行中の場合、ユーザーは すべてのデータベースにわたるすべての非システムfind
changeStream
コレクション に対して および 特権アクションを持っている必要があります。つまり、ユーザーは、次の 特権 を付与する ロール を持っていなければなりません。
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
組み込みの read
ロールにより、適切な権限が付与されます。
カーソルの反復
MongoDB には、カーソルを反復処理する方法が複数用意されています。
cursor.hasNext()
メソッドの場合、ブロックして次のイベントを待機します。watchCursor
カーソルを監視してイベントを反復処理するには、次のように hasNext()
を使用します。
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
cursor.tryNext()
メソッドはノンブロッキングです。watchCursor
カーソルを監視してイベントを反復処理するには、次のように tryNext()
を使用します。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例
mongosh
の次の操作は、レプリカセット上で変更ストリーム カーソルを開きます。 返されたカーソルは、system
データベース、admin
データベース、local
データベースを除く全データベースにわたるすべてのconfig
以外のコレクションに対するデータ変更を報告します。
watchCursor = db.getMongo().watch()
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed()
メソッドを cursor.tryNext()
メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
注意
変更ストリームでは isExhausted()
を使用できません。