db.collection.watch()
項目一覧
定義
db.collection.watch( pipeline, options )
重要
mongosh メソッド
このページでは、
mongosh
メソッドについて説明します。ただし、データベースコマンドや Node.js などの言語固有のドライバーのドキュメントには該当しません。データベースコマンドについては、
$changeStream
集計ステージのaggregate
コマンドを参照してください。MongoDB API ドライバーについては、各言語の MongoDB ドライバー ドキュメントを参照してください。
レプリカセットとシャーディングされたクラスターのみ
コレクション上で変更ストリーム カーソルを開きます。
Parameterタイプ説明pipeline
配列任意。次の集計ステージの1つ以上で構成される集計パイプライン:
変更イベント出力をフィルタリング/修正するためのパイプラインを指定します。
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
options
ドキュメントoptions
ドキュメントには、次のフィールドと値を含めることができます。フィールドタイプ説明resumeAfter
ドキュメント任意。 再開トークンで指定された操作の後に再開通知を試行するように
watch()
に指示します。各変更ストリームのイベント ドキュメントには、
_id
フィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_id
フィールド全体を渡します。resumeAfter
は、startAfter
およびstartAtOperationTime
と排他関係にあります。startAfter
ドキュメント任意。 再開トークンで指定された操作の後に新しい変更ストリームの開始を試行するように
watch()
に指示します。 無効化イベント 後に通知を再開できるようにします。各変更ストリームのイベント ドキュメントには、
_id
フィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_id
フィールド全体を渡します。startAfter
は、resumeAfter
およびstartAtOperationTime
と排他関係にあります。fullDocument
string任意。 デフォルトでは、アップデートされたドキュメント全体ではなく、アップデート操作によって変更されたフィールドのデルタが返され
watch()
。fullDocument
を"updateLookup"
に設定すると、更新されたドキュメントの過半数がコミットした最新のバージョンを参照するようwatch()
に指示します。watch()
は、updateDescription
デルタに加えて、ドキュメント検索を含むfullDocument
フィールドを返します。MongoDB 6.0 以降では、
fullDocument
を次のように設定できます。"whenAvailable"
これは、ドキュメントの挿入、置換、または更新後に、利用可能な場合、ドキュメントの変更後のイメージを出力するためです。"required"
ドキュメントの挿入、置換、またはアップデート後に、ドキュメントの変更後のイメージを出力します。変更後のイメージが利用できない場合、エラーが発生します。
fullDocumentBeforeChange
string任意。
MongoDB 6.0 以降では、新しい
fullDocumentBeforeChange
フィールドを使用して次のように設定できます。"whenAvailable"
ドキュメントの置換、アップデート、または削除前に、利用可能な場合、ドキュメントの変更前のイメージを出力します。"required"
これは、ドキュメントが置換、更新、削除される前に、ドキュメントの変更前のイメージを出力するためです。変更前のイメージが利用できない場合、エラーが発生します。"off"
変更前のドキュメントのイメージを非表示にします。デフォルトは"off"
です。
batchSize
整数任意。MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数を指定します。
cursor.batchSize()
と同じ機能を持ちます。maxAwaitTimeMS
整数任意。空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。
デフォルトは
1000
ミリ秒です。collation
ドキュメントshowExpandedEvents
ブール値任意。MongoDB 6.0 以降、変更ストリームは、 createIndexes イベントや dropIndexes イベントなどの DDL イベントの変更通知をサポートします。展開されたイベントを変更ストリームに含めるには、
showExpandedEvents
オプションを使用して変更ストリーム カーソルを作成します。バージョン 6.0 で追加。
startAtOperationTime
タイムスタンプ任意。変更ストリームの開始点。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。oplog の時間範囲を確認するには、
rs.printReplicationInfo()
を参照してください。startAtOperationTime
は、resumeAfter
およびstartAfter
と排他関係にあります。次の値を返します。 MongoDB 配置への接続が開いていて、コレクションが存在する限り開いたままのカーソル。変更イベント ドキュメントの例については、「変更イベント」を参照してください。
可用性
配置
db.collection.watch()
は次のように、レプリカ セットおよびシャーディングされたクラスターの配置で利用できます。
レプリカセットの場合、データを保持している任意のメンバーに対して
db.collection.watch()
を発行できます。シャーディングされたクラスターの場合、 インスタンスで
db.collection.watch()
mongos
を発行する必要があります。
ストレージ エンジン
読み取り保証 (read concern)majority
サポート
MongoDB 4.2 以降では、"majority"
の読み取り保証 (read concern)のサポートに関係なく変更ストリームが利用可能になりました。つまり、変更ストリームを使用する際、読み取り保証 (read concern) のmajority
サポートを有効にする(デフォルト)か無効にするかを選択できます。
MongoDB 4.0 以前では、変更ストリームは、"majority"
読み取り保証(read concern)サポートが有効(デフォルト)の場合にのみ使用できます。
動作
db.collection.watch()
データを保持しているノードの大半に反映されたデータ変更についてのみ通知します。変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
再開可能性
MongoDBドライバーとは異なり、 mongosh
はエラー後に変更ストリーム カーソルの再開を自動的に試行しません。 MongoDB ドライバーは、特定のエラーが発生後に 変更ストリーム カーソルの自動的な再開を1 回試行します。
db.collection.watch()
は、oplog に保存されている情報を使用して変更イベントの説明を生成し、その操作に関連付けられた再開トークンを生成します。 resumeAfter
またはstartAfter
オプションに渡される再開トークンによって識別される操作がすでにoplogから削除されている場合、 db.collection.watch()
は変更ストリームを再開できません。
変更ストリームの再開の詳細については、「変更ストリームの再開」を参照してください。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfter
を使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じてしまい、閉じた変更ストリームのカーソルが完全に再開できなくなることがあります。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、 resumeAfter
を使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.
アップデート 操作の完全なドキュメント検索
デフォルトでは、変更ストリーム カーソルはアップデート操作における特定のフィールドの変更またはデルタを返します。また、変更されたドキュメントのうち、過半数がコミットした最新のバージョンを検索して返すように変更ストリームを構成できます。アップデートと検索の間に他の書き込み操作が行われた場合、返されるドキュメントがアップデート実行時のドキュメントと大幅に異なる可能性があります。
アップデート操作中に適用された変更の数と完全なドキュメントのサイズによっては、アップデート操作における変更イベント ドキュメントのサイズが BSON ドキュメントの制限である 16 MB を超えるリスクがあります。サイズが超過した場合、サーバーで変更ストリーム カーソルが閉じられ、エラーが返されます。
アクセス制御
アクセス制御を使用して実行中の場合、ユーザーは コレクション リソースに対して find
および changeStream
の特権アクションを持っている必要があります。つまり、ユーザーは、次の特権を付与するロールを持っていなければなりません。
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
組み込みの read
ロールにより、適切な権限が付与されます。
カーソルの反復
MongoDB には、カーソルを反復処理する方法が複数用意されています。
cursor.hasNext()
メソッドの場合、ブロックして次のイベントを待機します。watchCursor
カーソルを監視してイベントを反復処理するには、次のように hasNext()
を使用します。
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
cursor.tryNext()
メソッドはノンブロッキングです。watchCursor
カーソルを監視してイベントを反復処理するには、次のように tryNext()
を使用します。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例
変更ストリームを開く
次の操作は、 data.sensors
コレクションに対し変更ストリーム カーソルを開きます。
watchCursor = db.getSiblingDB("data").sensors.watch()
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed()
メソッドを cursor.tryNext()
メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
注意
変更ストリームでは isExhausted()
を使用できません。
変更ストリームで updateLookup に fullDocument オプションを使用する
fullDocument
オプションを "updateLookup"
に設定すると、アップデートされた変更ストリーム イベントに関連するドキュメントについて、過半数がコミットした最新のバージョンを参照するよう、変更ストリーム カーソルに指示します。
次の操作は、 fullDocument : "updateLookup"
オプションを使用して、data.sensors
コレクションに対し変更ストリーム カーソルを開きます。
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed()
メソッドを cursor.tryNext()
メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
どのアップデート操作も、変更イベントはドキュメント検索の結果を fullDocument
フィールドに返します。
fullDocumentのアップデート出力の例については、「変更ストリーム アップデート イベント」を参照してください。
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
変更ストリームにおけるドキュメントの変更前と変更後のイメージ
MongoDB 6.0 以降では、変更ストリーム イベントを使用して、変更前と変更後のドキュメントのバージョン(変更前とイメージと変更後のイメージ)を出力できます。
変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。
変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。
db.createCollection()
、create
、またはcollMod
を使用し、コレクションに対してchangeStreamPreAndPostImages
を有効にします。
変更ストリーム イベントにおいて、次の条件に当てはまる場合、変更前と変更後のイメージは使用できません。
ドキュメントの更新または削除操作時に、コレクションにおいて有効になっていない場合。
expireAfterSeconds
で設定した、変更前と変更後のイメージ保持時間が経過した後に削除された場合。次の例では、クラスター全体で
expireAfterSeconds
を100
秒に設定します。use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 次の例では、
expireAfterSeconds
を含む現在のchangeStreamOptions
設定を返します。db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) expireAfterSeconds
をoff
に設定すると、デフォルトの保持ポリシーが適用されます。対応する変更ストリーム イベントがoplog から削除されるまで、変更前と変更後のイメージは保持されます。変更ストリーム イベントが oplog から削除されると、
expireAfterSeconds
の変更前と変更後のイメージの保持時間にかかわらず、対応する変更前と変更後のイメージも削除されます。
その他の考慮事項
変更前と変更後のイメージを有効にすると、ストレージ容量が消費され、処理時間が増えます。変更前と変更後のイメージは、必要な場合のみ有効にしてください。
変更ストリーム イベントのサイズを 16 メガバイト未満に制限します。イベントのサイズを制限するには、次の方法があります。
ドキュメントのサイズを 8 MB に制限します。
updateDescription
のような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、変更ストリーム出力で変更前と変更後のイメージを同時にリクエストできます。updateDescription
のような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、ドキュメントの変更ストリーム出力では最大 16 MB の変更後のイメージのみをリクエストしてください。次の場合、ドキュメントの変更ストリーム出力では最大 16 MB の変更前のイメージのみをリクエストしてください。
ドキュメントのアップデートがドキュメントの構造または内容のごく一部にしか影響しない場合、そして
replace
変更イベントが発生しない場合。replace
イベントには、常に変更後のイメージが含まれます。
変更前イメージをリクエストするには、
db.collection.watch()
でfullDocumentBeforeChange
をrequired
またはwhenAvailable
に設定します。変更後イメージをリクエストするには、同じ方法でfullDocument
を設定します。変更前のイメージは
config.system.preimages
コレクションに書き込まれます。config.system.preimages
コレクションが大きくなる場合があります。コレクションのサイズを制限するには、前述のとおり、変更前のイメージにexpireAfterSeconds
時間を設定します。変更前のイメージはバックグラウンド プロセスによって非同期で削除されます。
重要
下位互換性のない機能
MongoDB 6.0 以降では、変更ストリームにドキュメントの変更前のイメージと変更後のイメージを使用している場合、以前の MongoDB バージョンにダウングレードする前に、collMod
コマンドを使用して各コレクションの changeStreamPreAndPostImages を無効にする必要があります。
Tip
以下も参照してください。
変更ストリーム イベントと出力については、「変更イベント」を参照してください。
コレクションの変更を監視するには、
db.collection.watch()
を参照してください。変更ストリーム出力の完全な例については、「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」を参照してください。
コレクションを作成
changeStreamPreAndPostImages が有効になっている temperatureSensor
コレクションを作成します。
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
temperatureSensor
コレクションに読み取った温度の値を入力します。
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
次のセクションでは、temperatureSensor
コレクションを使用する、ドキュメントの変更前と変更後のイメージの変更ストリームの例を示します。
ドキュメントの変更前のイメージを使用する変更ストリーム
ドキュメントの変更前イメージが使用可能な場合は、fullDocumentBeforeChange: "whenAvailable"
設定を使用して出力します。変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。
次の例では、fullDocumentBeforeChange:
"whenAvailable"
を使用してtemperatureSensor
コレクションの変更ストリーム カーソルを作成します。
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
次の例では、カーソルを使用して新しい変更ストリーム イベントを確認します。
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
この例では、次のことが行われます。
while
ループはカーソルが閉じるまで実行されます。カーソルにドキュメントがある場合、
hasNext()
はtrue
を返します。
次の例では、temperatureSensor
ドキュメントの reading
フィールドをアップデートします。
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
temperatureSensor
ドキュメントがアップデートされると、変更イベントによってドキュメントの変更前のイメージが fullDocumentBeforeChange
フィールドに出力されます。変更前のイメージには、アップデート前のtemperatureSensor
ドキュメントの reading
フィールドが含まれています。たとえば次のとおりです。
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
Tip
以下も参照してください。
ドキュメントのアップデート出力の詳細については、「変更ストリーム のアップデート イベント」を参照してください。
変更ストリームの出力の詳細については、「変更イベント」を参照してください。
ドキュメントの変更後のイメージを使用する変更ストリーム
ドキュメントの変更後のイメージが使用可能な場合は、fullDocument: "whenAvailable"
設定を使用して出力します。変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。
次の例では、fullDocument:
"whenAvailable"
を使用してtemperatureSensor
コレクションの変更ストリーム カーソルを作成します。
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
次の例では、カーソルを使用して新しい変更ストリーム イベントを確認します。
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
この例では、次のことが行われます。
while
ループはカーソルが閉じるまで実行されます。カーソルにドキュメントがある場合、
hasNext()
はtrue
を返します。
次の例では、temperatureSensor
ドキュメントの reading
フィールドをアップデートします。
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
temperatureSensor
ドキュメントがアップデートされると、変更イベントによってドキュメントの変更後のイメージが fullDocument
フィールドに出力されます。変更後のイメージには、アップデート後のtemperatureSensor
ドキュメントの reading
フィールドが含まれています。たとえば次のとおりです。
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
以下も参照してください。
ドキュメントのアップデート出力の詳細については、「変更ストリーム のアップデート イベント」を参照してください。
変更ストリームの出力の詳細については、「変更イベント」を参照してください。
変更ストリームで集約パイプライン フィルターを使用する
注意
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
次の操作は、集約パイプラインを使用して data.sensors
コレクションに対する変更ストリーム カーソルを開き、insert
イベントのみをフィルタリングします。
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed()
メソッドを cursor.hasNext()
メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
変更ストリーム カーソルは、 operationType
が insert
である変更イベントのみを返します。変更ストリームの出力における詳細なドキュメントについては、「変更イベント」を参照してください。
変更ストリームの再開
変更ストリーム カーソルによって返されるすべてのドキュメントには、_id
フィールドとして再開トークンが含まれます。変更ストリームを再開するには、再開する変更イベントの _id
ドキュメント全体を watch()
の resumeAfter
または startAfter
オプションに渡します。
次の操作では、再開トークンを使用して data.sensors
コレクションに対する変更ストリーム カーソルを再開します。再開トークンを生成した操作が、クラスターの oplog からロール オフされていないことを前提としています。
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed()
メソッドを cursor.hasNext()
メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
変更ストリームの再開に関する詳細なドキュメントについては、「変更ストリームの再開」を参照してください。