変更ストリーム
項目一覧
変更ストリームを使用すると、 oplog を手動で追跡する以前の複雑さやリスクなしで、アプリケーションがリアルタイムデータ変更にアクセスできます。アプリケーションは変更ストリームを使用して、単一のコレクション、データベース、または配置全体のすべてのデータ変更にサブスクライブし、それらに即時に対応できます。変更ストリームは集計フレームワークを使用するため、アプリケーションは特定の変更をフィルタリングすることも、通知を任意に変換することもできます。
可用性
変更ストリームは、レプリカセットとシャーディングされたクラスターで使用できます。
ストレージエンジン
レプリカセットとシャーディングされたクラスターでは、WiredTiger ストレージエンジンを使用する必要があります。変更ストリームは、MongoDB の保管時の暗号化機能を採用した配置でも使用できます。
レプリカセット プロトコル バージョン
レプリカセットとシャーディングされたクラスターは、レプリカセット プロトコル バージョン 1(
pv1
)を使用する必要があります。読み取り保証 (read concern) "過半数" 有効化
MongoDB 4.2 以降では、
"majority"
の読み取り保証 (read concern) のサポートに関係なく変更ストリームが利用可能になりました。つまり、読み取り保証 (read concern) のmajority
サポートが有効(デフォルト)でも無効でも、変更ストリームを利用できます。MongoDB 4.0 以前では、変更ストリームは、
"majority"
読み取り保証 (read concern) サポートが有効(デフォルト)の場合にのみ使用できます。
Stable API でのサポート
変更ストリームはStable API V 1に含まれています。
接続
変更ストリームの接続は、+srv
接続オプションを使用して DNS シードリスト、または接続文字列でサーバーを個別にリストする方法のいずれかを使用できます。
変更ストリームへの接続を失うか、接続が切断された場合、ドライバーは、読み込み設定 (read preference) が一致するクラスター内の別のノードを介して変更ストリームへの接続を再確立しようとします。ドライバーが正しく読み込み設定 (read preference) されているノードを見つけられない場合、例外が発生します。
詳細については、「接続文字列 URI 形式」を参照してください。
コレクション、データベース、または配置の監視
変更ストリームを以下に対して開くことができます。
ターゲット | 説明 |
---|---|
コレクション | 単一のコレクション( このページの例では、MongoDB ドライバーを使用して、単一のコレクションの変更ストリーム カーソルを開いて操作します。 |
データベース | 単一データベース( MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 |
配置 | 配置(レプリカセットまたはシャーディングされたクラスター)の変更ストリーム カーソルを開き、すべてのデータベース( MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 |
注意
変更ストリームの例
このページの例では、MongoDB ドライバーを使用して、コレクションの変更ストリーム カーソルを開き、変更ストリーム カーソルを操作する方法を説明します。
変更ストリームのパフォーマンスに関する考慮事項
データベースに対して開かれたアクティブな変更ストリームの量が接続プールのサイズを超えると、通知のレイテンシが発生する可能性があります。各変更ストリームは、次のイベントを待機する間、変更ストリームに対する接続と getMore 操作を使用します。レイテンシの問題を回避するには、開かれた変更ストリームの数よりもプール サイズの方が大きいことを確認しておく必要があります。詳細については、maxPoolSize の設定を参照してください。
変更ストリームを開く
変更ストリームを開くには
レプリカセットの場合、データを保持しているいずれのノードからでも、変更ストリームを開く操作を実行できます。
シャーディングされたクラスターの場合、変更ストリームを開く操作は
mongos
から実行する必要があります。
次の例では、コレクションの変更ストリームを開き、カーソルを反復処理して変更ストリーム ドキュメントを検索します。[1]
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
以下の C の例では MongoDB レプリカセットに接続し、アクセスしているデータベース に inventory
コレクションが含まれていることを前提としています。
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
以下の C# の例では、MongoDB レプリカセットに接続し、inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
以下の Go の例では、MongoDBレプリカセットに接続し、inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
以下の Java の例では、MongoDB レプリカセットに接続し、inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
以下の例では MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory
コレクションが含まれていることを前提としています。
cursor = db.inventory.watch() document = await cursor.next()
以下の Node.js の例では、MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory
コレクションが含まれていることを前提としています。
次の例では、ストリームを使用して変更イベントを処理しています。
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
以下の例では、MongoDB レプリカセットに接続し、 inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
以下の Python の例では、 MongoDB レプリカセットに接続し、 かつinventory
コレクションを含むデータベースにアクセスしていることを前提としています。
cursor = db.inventory.watch() next(cursor)
以下の例では、MongoDB レプリカセットに接続し、 inventory
コレクションを含むデータベースにアクセスしていることを前提としています。
cursor = inventory.watch.to_enum next_change = cursor.next
以下の Swift(Async)例で は MongoDB レプリカセットに接続し、データベースにアクセスしているinventory
ことを前提としています。 これには コレクションが含まれます。
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
以下の Swift(Sync)例では MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory
コレクションが含まれていることを前提としています。
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
カーソルからデータ変更イベントを検索するには、変更ストリームのカーソルを反復処理します。変更ストリーム イベントの詳細については、「変更イベント」を参照してください。
変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
カーソルが明示的に閉じている。
無効化イベントが発生している(コレクションの削除や名前の変更など)。
MongoDB 配置への接続が閉じているか、タイムアウトになっている。詳細については、「カーソルの動作」を参照してください。
配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じてしまい、閉じた変更ストリームのカーソルが完全に再開できなくなることがあります。
注意
閉じられていないカーソルのライフサイクルは言語に依存します。
[1] | startAtOperationTime を指定すると、特定の時点でカーソルを開くことができます。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。 |
変更ストリーム出力の修正
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
リストには、username
が alice
である操作または operationType
が delete
である操作をフィルター処理する単一の $match
ステージが含まれます。
pipeline
をwatch()
メソッドに渡すと、変更ストリームは指定されたpipeline
を介して通知を渡し、通知を返すように指示します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
次の例では、ストリームを使用して変更イベントを処理しています。
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
Tip
変更ストリーム イベント ドキュメントの _id フィールドは、再開トークンとして機能します。変更ストリーム イベントの _id
フィールドを変更または削除するために、パイプラインを使用しないでください。
MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。
更新操作のための完全なドキュメントの検索
デフォルトでは、変更ストリームは更新操作中にフィールドのデルタのみを返します。ただし、更新されたドキュメントの過半数がコミットした最新のバージョンを返すように変更ストリームを構成できます。
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument"
オプションに "updateLookup"
値を指定してmongoc_collection_watch
メソッドを使用します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument
フィールドが含まれています。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument
フィールドが含まれています。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、ストリームオプション SetFullDocument(options.UpdateLookup)
を変更します。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 FullDocument.UPDATE_LOOKUP
をdb.collection.watch.fullDocument()
メソッドに渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument
フィールドが含まれています。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup'
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す `full_document
フィールドが含まれています。
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、{ fullDocument: 'updateLookup' }
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument
フィールドが含まれています。
次の例では、ストリームを使用して変更イベントを処理しています。
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
を db.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument
フィールドが含まれています。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup'
を db.collection.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document
フィールドが含まれています。
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document: 'updateLookup'
を db.watch()
メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document
フィールドが含まれています。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options:
ChangeStreamOptions(fullDocument: .updateLookup)
をwatch()
メソッドに渡します。
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options:
ChangeStreamOptions(fullDocument: .updateLookup)
をwatch()
メソッドに渡します。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
注意
過半数がコミットした操作で、更新操作後、lookup 前に、更新されたドキュメントを変更した操作が 1 つ以上ある場合、返される完全なドキュメント全体は、更新操作時のドキュメントとは大幅に異なる可能性があります。
ただし、変更ストリーム ドキュメントに含まれるデルタには、その変更ストリーム イベントに適用された監視対象コレクションの変更が常に正しく記述されています。
変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。
変更ストリームの再開
変更ストリームは、カーソルを開くときに、resumeAfter または startAfter のいずれかに再開トークンを指定することによって再開できます。
resumeAfter
Change Streams の
カーソルを開くときに再開トークンを resumeAfter
に渡すことで、特定のイベントの後に変更ストリームを再開できます。
再開トークンの詳細については、「再開トークン」を参照してください。
重要
oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfter
を使用して変更ストリームを再開することはできません。代わりに、startAfter を使用して、無効化イベント後に新しい変更ストリームを開始できます。
以下の例では、ストリームが破棄された後に再作成されるように、ストリーム オプションにresumeAfter
オプションが追加されています。変更ストリームに _id
を渡すと、指定された操作の後に通知の再開が試行されます。
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
以下の例では、 resumeToken
が最後の変更ストリーム ドキュメントから検索され、オプションとしてWatch()
メソッドに渡されています。 resumeToken
をWatch()
メソッドに渡すと、変更ストリームは、再開トークンで指定された操作の後に通知の再開を試行するように指示します。
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
ChangeStreamOptions.SetResumeAfter を使用できます 変更ストリームの再開トークンを指定します。resumeAfter オプションが設定されている場合、変更ストリームは再開トークンで指定された操作の後に通知を再開します。 SetResumeAfter
は、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeToken
です。
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
resumeAfter()
メソッドを使用すると、再開トークンで指定された操作の後に通知を再開できます。 resumeAfter()
メソッドは、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeToken
です。
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
resume_after
修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after
修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token
です。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken
です。
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では $resumeToken
です。
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
resume_after
修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after
修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token
です。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
resume_after
修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after
修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token
です。
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken
です。
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
resumeAfter
オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter
オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken
です。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
Change Streams の
カーソルを開くときに再開トークンを startAfter
に渡すことで、特定のイベントの後に新しい変更ストリームを開始できます。resumeAfter とは異なり、startAfter
は新しい変更ストリームを作成することで無効化イベント後に通知を再開できます。
再開トークンの詳細については、「再開トークン」を参照してください。
重要
oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。
再開トークン
再開トークンは、複数のソースから利用できます。
ソース | 説明 |
---|---|
各変更イベントの通知には、 _id フィールドに再開トークンが含まれています。 | |
このフィールドは、 | |
getMore コマンドには、cursor.postBatchResumeToken フィールドに再開トークンが含まれています。 |
MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
変更イベントからの再開トークン
変更イベントの通知には、_id
フィールドに再開トークンが含まれています。
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
からの再開トークン aggregate
aggregate
コマンドを使用する場合、$changeStream
集計ステージには cursor.postBatchResumeToken
フィールドに再開トークンが含まれています。
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
からの再開トークン getMore
getMore
コマンドにも、cursor.postBatchResumeToken
フィールドに再開トークンが含まれています。
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
ユースケース
変更ストリームは、データの変更が確実であることが確認された後に下流のシステムに通知することで、依存しているビジネスシステムを持つアーキテクチャにメリットをもたらすことができます。たとえば、変更ストリームは、データ抽出・変換・ロード(ETL)サービス、クロスプラットフォーム同期、コラボレーション機能、通知サービスを実装する際、開発者の時間を節約するのに役立ちます。
アクセス制御
特定のコレクションに対して変更ストリームを開くには、対応するコレクションに対して
changeStream
アクションとfind
アクションを許可する権限がアプリケーションに必要です。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 単一のデータベースに対して変更ストリームを開くには、データベース内のすべての非
system
コレクションに対してchangeStream
アクションとfind
アクションを許可する権限がアプリケーションに必要です。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 配置の全体に対して変更ストリームを開くには、配置内のすべてのデータベースのすべての非
system
コレクションに対してchangeStream
アクションとfind
アクションを許可する権限がアプリケーションに必要です。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
イベント通知
変更ストリームは、レプリカセット内のデータを保持しているノードの過半数に反映されたデータ変更についてのみ通知します。これにより、障害発生時にも耐久性がある、過半数にコミットされた変更によってのみ通知がトリガーされるようになります。
たとえば、レプリカセットが3つのノードで構成されており、プライマリに対して変更ストリーム カーソルが開かれているとします。クライアントが挿入操作を実行した場合、変更ストリームは、挿入がデータを保持しているノードの過半数に保存された後でのみ、データ変更をアプリケーションに通知します。
操作がトランザクションに関連付けられている場合、変更イベント ドキュメントには txnNumber
と lsid
が含まれます。
照合
明示的な照合が指定されていない限り、変更ストリームはsimple
バイナリ比較を使用します。