変更ストリーム
項目一覧
変更ストリームを使用すると、 oplog を手動で追跡する以前の複雑さやリスクなしで、アプリケーションがリアルタイムデータ変更にアクセスできます。アプリケーションは変更ストリームを使用して、単一のコレクション、データベース、または配置全体のすべてのデータ変更にサブスクライブし、それらに即時に対応できます。変更ストリームは集計フレームワークを使用するため、アプリケーションは特定の変更をフィルタリングすることも、通知を任意に変換することもできます。
MongoDB 5.1 以降では、変更ストリームが最適化され、リソースの使用効率が上がり、一部の集計パイプライン ステージの実行が高速化されています。
可用性
変更ストリームは、レプリカセットとシャーディングされたクラスターで使用できます。
ストレージエンジン
レプリカセットとシャーディングされたクラスターでは、WiredTiger ストレージエンジンを使用する必要があります。変更ストリームは、MongoDB の保管時の暗号化機能を採用した配置でも使用できます。
レプリカセット プロトコル バージョン
レプリカセットとシャーディングされたクラスターは、レプリカセット プロトコル バージョン 1(
pv1
)を使用する必要があります。読み取り保証 (read concern) "過半数" 有効化
変更ストリームは、
"majority"
の読み取り保証 (read concern) のサポートに関係なく 使用できます 。つまり、変更ストリームを使用するには、読み取り保証(read concern)majority
のサポートを有効にする(デフォルト)か無効にするかを選択できます。
Stable API でのサポート
変更ストリームはStable API V 1に含まれています。 ただし、 showExpandedEventsオプションは Stable API V 1に含まれていません。
接続
変更ストリームの接続は、+srv
接続オプションを使用して DNS シードリスト、または接続文字列でサーバーを個別にリストする方法のいずれかを使用できます。
変更ストリームへの接続を失うか、接続が切断された場合、ドライバーは、読み込み設定 (read preference) が一致するクラスター内の別のノードを介して変更ストリームへの接続を再確立しようとします。ドライバーが正しく読み込み設定 (read preference) されているノードを見つけられない場合、例外が発生します。
詳細については、「接続文字列 URI 形式」を参照してください。
コレクション、データベース、または配置の監視
変更ストリームを以下に対して開くことができます。
ターゲット | 説明 |
---|---|
コレクション | 単一のコレクション( このページの例では、MongoDB ドライバーを使用して、単一のコレクションの変更ストリーム カーソルを開いて操作します。 |
データベース | 単一データベース( MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 |
配置 | 配置(レプリカセットまたはシャーディングされたクラスター)の変更ストリーム カーソルを開き、すべてのデータベース( MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 |
注意
変更ストリームの例
このページの例では、MongoDB ドライバーを使用して、コレクションの変更ストリーム カーソルを開き、変更ストリーム カーソルを操作する方法を説明します。
変更ストリームのパフォーマンスに関する考慮事項
データベースに対して開かれたアクティブな変更ストリームの量が接続プールのサイズを超えると、通知のレイテンシが発生する可能性があります。各変更ストリームは、次のイベントを待機する間、変更ストリームに対する接続と getMore 操作を使用します。レイテンシの問題を回避するには、開かれた変更ストリームの数よりもプール サイズの方が大きいことを確認しておく必要があります。詳細については、maxPoolSize の設定を参照してください。
シャーディングされたクラスターの考慮事項
変更ストリームをシャーディングされたクラスター上で開くとき
mongos
は、各シャードに個別の変更ストリームを作成します。この動作は、変更ストリームが特定のシャードキーの範囲を対象にしているかどうかに関わらず発生します。mongos
は変更ストリームの結果を受け取ると、その結果をソートおよびフィルタリングします。必要に応じて、mongos
はfullDocument
ルックアップも実行します。
最高のパフォーマンスを得るには、変更ストリームでの $lookup
クエリの使用を制限します。
変更ストリームを開く
変更ストリームを開くには
レプリカセットの場合、データを保持しているいずれのノードからでも、変更ストリームを開く操作を実行できます。
シャーディングされたクラスターの場合、変更ストリームを開く操作は
mongos
から実行する必要があります。
次の例では、コレクションの変更ストリームを開き、カーソルを反復処理して変更ストリーム ドキュメントを検索します。[1]
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
以下の C の例では MongoDB レプリカセットに接続し、アクセスしているデータベース に inventory
コレクションが含まれていることを前提としています。
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
以下の C# の例では、MongoDB レプリカセットに接続し、inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
以下の Go の例では、MongoDBレプリカセットに接続し、inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
以下の Java の例では、MongoDB レプリカセットに接続し、inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
以下の Kotlin の例では、 MongoDB レプリカセットに接続し、 かつinventory
コレクションを含むデータベースにアクセスできる ことを前提としています。 これらのタスクの完了の詳細については、「 Kotlin コルーチン ドライバーのデータベースとコレクション 」のガイドを参照してください。
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
以下の例では MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory
コレクションが含まれていることを前提としています。
cursor = db.inventory.watch() document = await cursor.next()
以下の Node.js の例では、MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory
コレクションが含まれていることを前提としています。
次の例では、ストリームを使用して変更イベントを処理しています。
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
changeStream により EventEmitter が拡張されます。
以下の例では、MongoDB レプリカセットに接続し、 inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
以下の Python の例では、 MongoDB レプリカセットに接続し、 かつinventory
コレクションを含むデータベースにアクセスしていることを前提としています。
cursor = db.inventory.watch() next(cursor)
以下の例では、MongoDB レプリカセットに接続し、 inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
cursor = inventory.watch.to_enum next_change = cursor.next
以下の Swift(Async)例で は MongoDB レプリカセットに接続し、データベースにアクセスしているinventory
ことを前提としています。 これには コレクションが含まれます。
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
以下の Swift(Sync)例では MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory
コレクションが含まれていることを前提としています。
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
カーソルからデータ変更イベントを検索するには、変更ストリームのカーソルを反復処理します。変更ストリーム イベントの詳細については、「変更イベント」を参照してください。
変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
カーソルが明示的に閉じている。
無効化イベントが発生している(コレクションの削除や名前の変更など)。
MongoDB 配置への接続が閉じているか、タイムアウトになっている。詳細については、「カーソルの動作」を参照してください。
配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じることがあります。閉じた変更ストリームのカーソルは完全に再開できない場合があります。
注意
閉じられていないカーソルのライフサイクルは言語に依存します。
[1] | startAtOperationTime を指定すると、特定の時点でカーソルを開くことができます。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。 |
変更ストリーム出力の修正
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
リストには、次の条件の 1 つまたは両方を満たす操作をフィルタリングする単一の$match
ステージが含まれます。
username
値はalice
operationType
値はdelete
pipeline
をwatch()
メソッドに渡すと、変更ストリームは指定されたpipeline
を介して通知を渡し、通知を返すように指示します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
pipeline
リストには、次の条件の 1 つまたは両方を満たす操作をフィルタリングする単一の$match
ステージが含まれます。
username
値はalice
operationType
値はdelete
pipeline
をwatch()
メソッドに渡すと、変更ストリームは指定されたpipeline
を介して通知を渡し、通知を返すように指示します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
次の例では、ストリームを使用して変更イベントを処理しています。
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
Tip
変更ストリーム イベント ドキュメントの _id フィールドは、再開トークンとして機能します。変更ストリーム イベントの _id
フィールドを変更または削除するために、パイプラインを使用しないでください。
MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。
更新操作のための完全なドキュメントの検索
デフォルトでは、変更ストリームは更新操作中にフィールドのデルタのみを返します。ただし、更新されたドキュメントの過半数がコミットした最新のバージョンを返すように変更ストリームを構成できます。
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument"
オプションに "updateLookup"
値を指定してmongoc_collection_watch
メソッドを使用します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument
フィールドが含まれています。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument
フィールドが含まれています。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、ストリームオプション SetFullDocument(options.UpdateLookup)
を変更します。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 FullDocument.UPDATE_LOOKUP
をdb.collection.watch.fullDocument()
メソッドに渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument
フィールドが含まれています。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 ChangeStreamFlow.FullDocument() に渡します。FullDocument.UPDATE_LOOKUP
使用して複数のドキュメントを挿入できます。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument
フィールドが含まれています。
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup'
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す `full_document
フィールドが含まれています。
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、{ fullDocument: 'updateLookup' }
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument
フィールドが含まれています。
次の例では、ストリームを使用して変更イベントを処理しています。
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
を db.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument
フィールドが含まれています。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup'
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document
フィールドが含まれています。
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document: 'updateLookup'
を db.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document
フィールドが含まれています。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options:
ChangeStreamOptions(fullDocument: .updateLookup)
をwatch()
メソッドに渡します。
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options:
ChangeStreamOptions(fullDocument: .updateLookup)
をwatch()
メソッドに渡します。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
注意
過半数がコミットした操作で、更新操作後、lookup 前に、更新されたドキュメントを変更した操作が 1 つ以上ある場合、返される完全なドキュメント全体は、更新操作時のドキュメントとは大幅に異なる可能性があります。
ただし、変更ストリーム ドキュメントに含まれるデルタには、その変更ストリーム イベントに適用された監視対象コレクションの変更が常に正しく記述されています。
次のいずれかに当てはまる場合、更新イベントの fullDocument
フィールドが欠落していることがあります。
ドキュメントが削除された場合、または更新と検索の間にコレクションが削除された場合。
更新によって、そのコレクションのシャードキーで、少なくとも1つのフィールドの値が変更された場合。
変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。
変更ストリームの再開
変更ストリームは、カーソルを開くときに、resumeAfter または startAfter のいずれかに再開トークンを指定することによって再開できます。
resumeAfter
Change Streams の
カーソルを開くときに再開トークンを resumeAfter
に渡すことで、特定のイベントの後に変更ストリームを再開できます。
再開トークンの詳細については、「再開トークン」を参照してください。
重要
oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfter
を使用して変更ストリームを再開することはできません。代わりに、startAfter を使用して、無効化イベント後に新しい変更ストリームを開始できます。
以下の例では、ストリームが破棄された後に再作成されるように、ストリーム オプションにresumeAfter
オプションが追加されています。変更ストリームに _id
を渡すと、指定された操作の後に通知の再開が試行されます。
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
以下の例では、 resumeToken
が最後の変更ストリーム ドキュメントから検索され、オプションとしてWatch()
メソッドに渡されています。 resumeToken
をWatch()
メソッドに渡すと、変更ストリームは、再開トークンで指定された操作の後に通知の再開を試行するように指示します。
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
ChangeStreamOptions.SetResumeAfter を使用できます 変更ストリームの再開トークンを指定します。resumeAfter オプションが設定されている場合、変更ストリームは再開トークンで指定された操作の後に通知を再開します。 SetResumeAfter
は、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeToken
です。
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
resumeAfter()
メソッドを使用すると、再開トークンで指定された操作の後に通知を再開できます。 resumeAfter()
メソッドは、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeToken
です。
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
ChangeStreamFlow.resumeAfter()メソッドを使用して、再開トークンで指定された操作の後に通知を再開します。 resumeAfter()
メソッドは、以下の例のresumeToken
変数など、再開トークンに解決する必要がある値を受け取ります。
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
resume_after
修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after
修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token
です。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken
です。
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では $resumeToken
です。
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
resume_after
修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after
修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token
です。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
resume_after
修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after
修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token
です。
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken
です。
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken
です。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
Change Streams の
カーソルを開くときに再開トークンを startAfter
に渡すことで、特定のイベントの後に新しい変更ストリームを開始できます。resumeAfter とは異なり、startAfter
は新しい変更ストリームを作成することで無効化イベント後に通知を再開できます。
再開トークンの詳細については、「再開トークン」を参照してください。
重要
oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。
再開トークン
再開トークンは、複数のソースから利用できます。
ソース | 説明 |
---|---|
各変更イベントの通知には、 _id フィールドに再開トークンが含まれています。 | |
このフィールドは、 | |
getMore コマンドには、cursor.postBatchResumeToken フィールドに再開トークンが含まれています。 |
MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
Tip
MongoDB に備わる 「スニペット」は、16 進数でエンコードされた再開トークンを解読する mongosh
の拡張機能です。
再開トークン をインストールして実行できます からのスニペットmongosh
:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
再開トークン を実行することもできます mongosh
npm
システムに がインストールされている場合は、コマンドラインからの実行( を使用せずに)。
npx mongodb-resumetoken-decoder <RESUME TOKEN>
下記についての詳細は、参照先をご覧ください。
変更イベントからの再開トークン
変更イベントの通知には、_id
フィールドに再開トークンが含まれています。
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
からの再開トークン aggregate
aggregate
コマンドを使用する場合、$changeStream
集計ステージには cursor.postBatchResumeToken
フィールドに再開トークンが含まれています。
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
からの再開トークン getMore
getMore
コマンドにも、cursor.postBatchResumeToken
フィールドに再開トークンが含まれています。
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
ユースケース
変更ストリームは、データの変更が確実であることが確認された後に下流のシステムに通知することで、依存しているビジネスシステムを持つアーキテクチャにメリットをもたらすことができます。たとえば、変更ストリームは、データ抽出・変換・ロード(ETL)サービス、クロスプラットフォーム同期、コラボレーション機能、通知サービスを実装する際、開発者の時間を節約するのに役立ちます。
アクセス制御
特定のコレクションに対して変更ストリームを開くには、対応するコレクションに対して
changeStream
アクションとfind
アクションを許可する権限がアプリケーションに必要です。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 単一のデータベースに対して変更ストリームを開くには、データベース内のすべての非
system
コレクションに対してchangeStream
アクションとfind
アクションを許可する権限がアプリケーションに必要です。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 配置の全体に対して変更ストリームを開くには、配置内のすべてのデータベースのすべての非
system
コレクションに対してchangeStream
アクションとfind
アクションを許可する権限がアプリケーションに必要です。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
イベント通知
変更ストリームは、レプリカセット内のデータを保持しているノードの過半数に反映されたデータ変更についてのみ通知します。これにより、障害発生時にも耐久性がある、過半数にコミットされた変更によってのみ通知がトリガーされるようになります。
たとえば、レプリカセットが3つのノードで構成されており、プライマリに対して変更ストリーム カーソルが開かれているとします。クライアントが挿入操作を実行した場合、変更ストリームは、挿入がデータを保持しているノードの過半数に保存された後でのみ、データ変更をアプリケーションに通知します。
操作がトランザクションに関連付けられている場合、変更イベント ドキュメントには txnNumber
と lsid
が含まれます。
照合
明示的な照合が指定されていない限り、変更ストリームはsimple
バイナリ比較を使用します。
Change Streams と孤立したドキュメント
MongoDB5.3 以降では、 範囲移行 中に、 孤立したドキュメント の更新に対して 変更ストリーム イベントは生成されません。
変更ストリームにおけるドキュメントの変更前と変更後のイメージ
MongoDB 6.0 以降では、変更ストリーム イベントを使用して、変更前と変更後のドキュメントのバージョン(変更前とイメージと変更後のイメージ)を出力できます。
変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。
変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。
db.createCollection()
、create
、またはcollMod
を使用し、コレクションに対してchangeStreamPreAndPostImages
を有効にします。
変更ストリーム イベントにおいて、次の条件に当てはまる場合、変更前と変更後のイメージは使用できません。
ドキュメントの更新または削除操作時に、コレクションにおいて有効になっていない場合。
expireAfterSeconds
で設定した、変更前と変更後のイメージ保持時間が経過した後に削除された場合。次の例では、クラスター全体で
expireAfterSeconds
を100
秒に設定します。use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 次の例では、
expireAfterSeconds
を含む現在のchangeStreamOptions
設定を返します。db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) expireAfterSeconds
をoff
に設定すると、デフォルトの保持ポリシーが適用されます。対応する変更ストリーム イベントがoplog から削除されるまで、変更前と変更後のイメージは保持されます。変更ストリーム イベントが oplog から削除されると、
expireAfterSeconds
の変更前と変更後のイメージの保持時間にかかわらず、対応する変更前と変更後のイメージも削除されます。
その他の考慮事項
変更前と変更後のイメージを有効にすると、ストレージ容量が消費され、処理時間が増えます。変更前と変更後のイメージは、必要な場合のみ有効にしてください。
変更ストリーム イベントのサイズを 16 メガバイト未満に制限します。イベントのサイズを制限するには、次の方法があります。
ドキュメントのサイズを 8 MB に制限します。
updateDescription
のような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、変更ストリーム出力で変更前と変更後のイメージを同時にリクエストできます。updateDescription
のような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、ドキュメントの変更ストリーム出力では最大 16 MB の変更後のイメージのみをリクエストしてください。次の場合、ドキュメントの変更ストリーム出力では最大 16 MB の変更前のイメージのみをリクエストしてください。
ドキュメントのアップデートがドキュメントの構造または内容のごく一部にしか影響しない場合、そして
replace
変更イベントが発生しない場合。replace
イベントには、常に変更後のイメージが含まれます。
変更前イメージをリクエストするには、
db.collection.watch()
で、fullDocumentBeforeChange
をrequired
またはwhenAvailable
に設定します。変更後イメージをリクエストするには、同じ方法でfullDocument
を設定します。変更前のイメージは
config.system.preimages
コレクションに書き込まれます。config.system.preimages
コレクションが大きくなる場合があります。コレクションのサイズを制限するには、前述のとおり、変更前のイメージにexpireAfterSeconds
時間を設定します。変更前のイメージはバックグラウンド プロセスによって非同期で削除されます。
重要
下位互換性のない機能
MongoDB 6.0 以降では、変更ストリームにドキュメントの変更前のイメージと変更後のイメージを使用している場合、以前の MongoDB バージョンにダウングレードする前に、collMod
コマンドを使用して各コレクションの changeStreamPreAndPostImages を無効にする必要があります。
Tip
以下も参照してください。
変更ストリーム イベントと出力については、「変更イベント」を参照してください。
コレクションの変更を監視するには、
db.collection.watch()
を参照してください。変更ストリーム出力の完全な例については、「Change Streams とドキュメントの変更前イメージおよび変更後イメージ」を参照してください。
変更ストリーム出力の完全な例については、「Change Streams とドキュメントの変更前イメージおよび変更後イメージ」を参照してください。