Docs Menu
Docs Home
/
MongoDBマニュアル

変更ストリーム

項目一覧

  • 可用性
  • 接続
  • コレクション、データベース、または配置の監視
  • 変更ストリームのパフォーマンスに関する考慮事項
  • 変更ストリームを開く
  • 変更ストリーム出力の修正
  • 更新操作のための完全なドキュメントの検索
  • 変更ストリームの再開
  • ユースケース
  • アクセス制御
  • イベント通知
  • 照合
  • Change Streams と孤立したドキュメント
  • 変更ストリームにおけるドキュメントの変更前と変更後のイメージ

変更ストリームを使用すると、 oplog を手動で追跡する以前の複雑さやリスクなしで、アプリケーションがリアルタイムデータ変更にアクセスできます。アプリケーションは変更ストリームを使用して、単一のコレクション、データベース、または配置全体のすべてのデータ変更にサブスクライブし、それらに即時に対応できます。変更ストリームは集計フレームワークを使用するため、アプリケーションは特定の変更をフィルタリングすることも、通知を任意に変換することもできます。

MongoDB 5.1 以降では、変更ストリームが最適化され、リソースの使用効率が上がり、一部の集計パイプライン ステージの実行が高速化されています。

変更ストリームは、レプリカセットシャーディングされたクラスターで使用できます。

変更ストリームはStable API V 1に含まれています。 ただし、 showExpandedEventsオプションは Stable API V 1に含まれていません。

変更ストリームの接続は、+srv 接続オプションを使用して DNS シードリスト、または接続文字列でサーバーを個別にリストする方法のいずれかを使用できます。

変更ストリームへの接続を失うか、接続が切断された場合、ドライバーは、読み込み設定 (read preference) が一致するクラスター内の別のノードを介して変更ストリームへの接続を再確立しようとします。ドライバーが正しく読み込み設定 (read preference) されているノードを見つけられない場合、例外が発生します。

詳細については、「接続文字列 URI 形式」を参照してください。

変更ストリームを以下に対して開くことができます。

ターゲット
説明

コレクション

単一のコレクション(system コレクション、または admin データベース、local データベース、config データベース内のコレクションを除く)の変更ストリーム カーソルを開くことができます。

このページの例では、MongoDB ドライバーを使用して、単一のコレクションの変更ストリーム カーソルを開いて操作します。mongosh メソッド db.collection.watch() も参照してください。

データベース

単一データベース(adminlocalconfig データベースを除く)の変更ストリーム カーソルを開いて、すべての非システム コレクションに対する変更を監視できます。

MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 mongoshメソッドdb.watch()も参照してください。

配置

配置(レプリカセットまたはシャーディングされたクラスター)の変更ストリーム カーソルを開き、すべてのデータベース(adminlocalconfig を除く)でシステム以外のコレクションに加えられた変更を監視できます。

MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 mongoshメソッドMongo.watch()も参照してください。

注意

変更ストリームの例

このページの例では、MongoDB ドライバーを使用して、コレクションの変更ストリーム カーソルを開き、変更ストリーム カーソルを操作する方法を説明します。

データベースに対して開かれたアクティブな変更ストリームの量が接続プールのサイズを超えると、通知のレイテンシが発生する可能性があります。各変更ストリームは、次のイベントを待機する間、変更ストリームに対する接続と getMore 操作を使用します。レイテンシの問題を回避するには、開かれた変更ストリームの数よりもプール サイズの方が大きいことを確認しておく必要があります。詳細については、maxPoolSize の設定を参照してください。

変更ストリームをシャーディングされたクラスター上で開くとき

  • mongos は、各シャードに個別の変更ストリームを作成します。この動作は、変更ストリームが特定のシャードキーの範囲を対象にしているかどうかに関わらず発生します。

  • mongos は変更ストリームの結果を受け取ると、その結果をソートおよびフィルタリングします。必要に応じて、mongosfullDocument ルックアップも実行します。

最高のパフォーマンスを得るには、変更ストリームでの $lookup クエリの使用を制限します。

変更ストリームを開くには

  • レプリカセットの場合、データを保持しているいずれのノードからでも、変更ストリームを開く操作を実行できます。

  • シャーディングされたクラスターの場合、変更ストリームを開く操作は mongos から実行する必要があります。

次の例では、コレクションの変更ストリームを開き、カーソルを反復処理して変更ストリーム ドキュメントを検索します。[1]


➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。


以下の C の例では MongoDB レプリカセットに接続し、アクセスしているデータベースinventory コレクションが含まれていることを前提としています。

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

以下の C# の例では、MongoDB レプリカセットに接続し、inventory コレクションを含むデータベースにアクセスしていることを前提としています。

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

以下の Go の例では、MongoDBレプリカセットに接続し、inventory コレクションを含むデータベースにアクセスしていることを前提としています。

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

以下の Java の例では、MongoDB レプリカセットに接続し、inventory コレクションを含むデータベースにアクセスしていることを前提としています。

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

以下の Kotlin の例では、 MongoDB レプリカセットに接続し、 かつinventoryコレクションを含むデータベースにアクセスできる ことを前提としています。 これらのタスクの完了の詳細については、「 Kotlin コルーチン ドライバーのデータベースとコレクション 」のガイドを参照してください。

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

以下の例では MongoDB レプリカセットに接続し、アクセスしているデータベースinventory コレクションが含まれていることを前提としています。

cursor = db.inventory.watch()
document = await cursor.next()

以下の Node.js の例では、MongoDB レプリカセットに接続し、アクセスしているデータベースinventory コレクションが含まれていることを前提としています。

次の例では、ストリームを使用して変更イベントを処理しています。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

代わりに、イテレーターを使用して変更イベントを処理することもできます。

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

changeStream により EventEmitter が拡張されます。

以下の例では、MongoDB レプリカセットに接続し、 inventory コレクションを含むデータベースにアクセスしていることを前提としています。

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

以下の Python の例では、 MongoDB レプリカセットに接続し、 かつinventoryコレクションを含むデータベースにアクセスしていることを前提としています。

cursor = db.inventory.watch()
next(cursor)

以下の例では、MongoDB レプリカセットに接続し、 inventory コレクションを含むデータベースにアクセスしていることを前提としています。

cursor = inventory.watch.to_enum
next_change = cursor.next

以下の Swift(Async)例で は MongoDB レプリカセットに接続し、データベースにアクセスしているinventory ことを前提としています。 これには コレクションが含まれます。

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

以下の Swift(Sync)例では MongoDB レプリカセットに接続し、アクセスしているデータベースinventory コレクションが含まれていることを前提としています。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

カーソルからデータ変更イベントを検索するには、変更ストリームのカーソルを反復処理します。変更ストリーム イベントの詳細については、「変更イベント」を参照してください。

変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。

  • カーソルが明示的に閉じている。

  • 無効化イベントが発生している(コレクションの削除や名前の変更など)。

  • MongoDB 配置への接続が閉じているか、タイムアウトになっている。詳細については、「カーソルの動作」を参照してください。

  • 配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じることがあります。閉じた変更ストリームのカーソルは完全に再開できない場合があります。

注意

閉じられていないカーソルのライフサイクルは言語に依存します。

[1] startAtOperationTime を指定すると、特定の時点でカーソルを開くことができます。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。

➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。


変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipelineリストには、次の条件の 1 つまたは両方を満たす操作をフィルタリングする単一の$matchステージが含まれます。

  • username 値は alice

  • operationType 値は delete

pipelinewatch()メソッドに渡すと、変更ストリームは指定されたpipelineを介して通知を渡し、通知を返すように指示します。

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

pipelineリストには、次の条件の 1 つまたは両方を満たす操作をフィルタリングする単一の$matchステージが含まれます。

  • username 値は alice

  • operationType 値は delete

pipelinewatch()メソッドに渡すと、変更ストリームは指定されたpipelineを介して通知を渡し、通知を返すように指示します。

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

次の例では、ストリームを使用して変更イベントを処理しています。

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

代わりに、イテレーターを使用して変更イベントを処理することもできます。

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

Tip

変更ストリーム イベント ドキュメントの _id フィールドは、再開トークンとして機能します。変更ストリーム イベントの _id フィールドを変更または削除するために、パイプラインを使用しないでください。

MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。

変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。

デフォルトでは、変更ストリームは更新操作中にフィールドのデルタのみを返します。ただし、更新されたドキュメントの過半数がコミットした最新のバージョンを返すように変更ストリームを構成できます。


➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。


過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument" オプションに "updateLookup" 値を指定してmongoc_collection_watch メソッドを使用します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument フィールドが含まれています。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"db.collection.watch() メソッドへ渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument フィールドが含まれています。

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、ストリームオプション SetFullDocument(options.UpdateLookup) を変更します。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 FullDocument.UPDATE_LOOKUPdb.collection.watch.fullDocument()メソッドに渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument フィールドが含まれています。

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 ChangeStreamFlow.FullDocument() に渡します。FullDocument.UPDATE_LOOKUP使用して複数のドキュメントを挿入できます。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument フィールドが含まれています。

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup'db.collection.watch() メソッドへ渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す `full_document フィールドが含まれています。

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、{ fullDocument: 'updateLookup' }db.collection.watch() メソッドへ渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument フィールドが含まれています。

次の例では、ストリームを使用して変更イベントを処理しています。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

代わりに、イテレーターを使用して変更イベントを処理することもできます。

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"db.watch() メソッドへ渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument フィールドが含まれています。

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup'db.collection.watch() メソッドへ渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document フィールドが含まれています。

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document: 'updateLookup'db.watch() メソッドへ渡します。

以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document フィールドが含まれています。

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options: ChangeStreamOptions(fullDocument: .updateLookup)watch()メソッドに渡します。

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options: ChangeStreamOptions(fullDocument: .updateLookup)watch()メソッドに渡します。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

注意

過半数がコミットした操作で、更新操作、lookup に、更新されたドキュメントを変更した操作が 1 つ以上ある場合、返される完全なドキュメント全体は、更新操作時のドキュメントとは大幅に異なる可能性があります。

ただし、変更ストリーム ドキュメントに含まれるデルタには、その変更ストリーム イベントに適用された監視対象コレクションの変更が常に正しく記述されています。

次のいずれかに当てはまる場合、更新イベントの fullDocument フィールドが欠落していることがあります。

  • ドキュメントが削除された場合、または更新と検索の間にコレクションが削除された場合。

  • 更新によって、そのコレクションのシャードキーで、少なくとも1つのフィールドの値が変更された場合。

変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。

変更ストリームは、カーソルを開くときに、resumeAfter または startAfter のいずれかに再開トークンを指定することによって再開できます。

カーソルを開くときに再開トークンを resumeAfter に渡すことで、特定のイベントの後に変更ストリームを再開できます。

再開トークンの詳細については、「再開トークン」を参照してください。

重要

  • oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。

  • 無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、resumeAfter を使用して変更ストリームを再開することはできません。代わりに、startAfter を使用して、無効化イベント後に新しい変更ストリームを開始できます。

以下の例では、ストリームが破棄された後に再作成されるように、ストリーム オプションにresumeAfter オプションが追加されています。変更ストリームに _id を渡すと、指定された操作の後に通知の再開が試行されます。

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

以下の例では、 resumeTokenが最後の変更ストリーム ドキュメントから検索され、オプションとしてWatch()メソッドに渡されています。 resumeTokenWatch()メソッドに渡すと、変更ストリームは、再開トークンで指定された操作の後に通知の再開を試行するように指示します。

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

ChangeStreamOptions.SetResumeAfter を使用できます 変更ストリームの再開トークンを指定します。resumeAfter オプションが設定されている場合、変更ストリームは再開トークンで指定された操作の後に通知を再開します。 SetResumeAfterは、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeTokenです。

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

resumeAfter()メソッドを使用すると、再開トークンで指定された操作の後に通知を再開できます。 resumeAfter()メソッドは、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeTokenです。

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

ChangeStreamFlow.resumeAfter()メソッドを使用して、再開トークンで指定された操作の後に通知を再開します。 resumeAfter()メソッドは、以下の例のresumeToken変数など、再開トークンに解決する必要がある値を受け取ります。

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

resume_after 修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after 修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token です。

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では $resumeToken です。

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

resume_after 修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after 修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token です。

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

resume_after 修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after 修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token です。

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

カーソルを開くときに再開トークンを startAfter に渡すことで、特定のイベントの後に新しい変更ストリームを開始できます。resumeAfter とは異なり、startAfter は新しい変更ストリームを作成することで無効化イベント後に通知を再開できます。

再開トークンの詳細については、「再開トークン」を参照してください。

重要

  • oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。

再開トークンは、複数のソースから利用できます。

ソース
説明

各変更イベントの通知には、_id フィールドに再開トークンが含まれています。

$changeStream 集計ステージには、cursor.postBatchResumeToken フィールドに再開トークンが含まれています。

このフィールドは、aggregate コマンドを使用しているときにのみ表示されます。

getMore コマンドには、cursor.postBatchResumeToken フィールドに再開トークンが含まれています。

MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。

Tip

MongoDB に備わる 「スニペット」は、16 進数でエンコードされた再開トークンを解読する mongosh の拡張機能です。

再開トークン をインストールして実行できます からのスニペットmongosh :

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

再開トークン を実行することもできます mongoshnpmシステムに がインストールされている場合は、コマンドラインからの実行( を使用せずに)。

npx mongodb-resumetoken-decoder <RESUME TOKEN>

下記についての詳細は、参照先をご覧ください。

変更イベントの通知には、_id フィールドに再開トークンが含まれています。

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

aggregateコマンドを使用する場合、$changeStream 集計ステージには cursor.postBatchResumeToken フィールドに再開トークンが含まれています。

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore コマンドにも、cursor.postBatchResumeToken フィールドに再開トークンが含まれています。

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

変更ストリームは、データの変更が確実であることが確認された後に下流のシステムに通知することで、依存しているビジネスシステムを持つアーキテクチャにメリットをもたらすことができます。たとえば、変更ストリームは、データ抽出・変換・ロード(ETL)サービス、クロスプラットフォーム同期、コラボレーション機能、通知サービスを実装する際、開発者の時間を節約するのに役立ちます。

自己管理型配置認可に認証を強制する配置の場合:

  • 特定のコレクションに対して変更ストリームを開くには、対応するコレクションに対して changeStream アクションと find アクションを許可する権限がアプリケーションに必要です。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 単一のデータベースに対して変更ストリームを開くには、データベース内のすべての非 system コレクションに対して changeStream アクションと find アクションを許可する権限がアプリケーションに必要です。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 配置の全体に対して変更ストリームを開くには、配置内のすべてのデータベースのすべての非 system コレクションに対して changeStream アクションとfind アクションを許可する権限がアプリケーションに必要です。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

変更ストリームは、レプリカセット内のデータを保持しているノードの過半数に反映されたデータ変更についてのみ通知します。これにより、障害発生時にも耐久性がある、過半数にコミットされた変更によってのみ通知がトリガーされるようになります。

たとえば、レプリカセットが3つのノードで構成されており、プライマリに対して変更ストリーム カーソルが開かれているとします。クライアントが挿入操作を実行した場合、変更ストリームは、挿入がデータを保持しているノードの過半数に保存された後でのみ、データ変更をアプリケーションに通知します。

操作がトランザクションに関連付けられている場合、変更イベント ドキュメントには txnNumberlsid が含まれます。

明示的な照合が指定されていない限り、変更ストリームはsimple バイナリ比較を使用します。

MongoDB5.3 以降では、 範囲移行 中に、 孤立したドキュメント の更新に対して 変更ストリーム イベントは生成されません。

MongoDB 6.0 以降では、変更ストリーム イベントを使用して、変更前と変更後のドキュメントのバージョン(変更前とイメージと変更後のイメージ)を出力できます。

  • 変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。

  • 変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。

  • db.createCollection()create 、または collModを使用し、コレクションに対して changeStreamPreAndPostImages を有効にします。

変更ストリーム イベントにおいて、次の条件に当てはまる場合、変更前と変更後のイメージは使用できません。

  • ドキュメントの更新または削除操作時に、コレクションにおいて有効になっていない場合。

  • expireAfterSeconds で設定した、変更前と変更後のイメージ保持時間が経過した後に削除された場合。

    • 次の例では、クラスター全体でexpireAfterSeconds100秒に設定します。

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • 次の例では、expireAfterSeconds を含む現在の changeStreamOptions 設定を返します。

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSecondsoff に設定すると、デフォルトの保持ポリシーが適用されます。対応する変更ストリーム イベントがoplog から削除されるまで、変更前と変更後のイメージは保持されます。

    • 変更ストリーム イベントが oplog から削除されると、 expireAfterSeconds の変更前と変更後のイメージの保持時間にかかわらず、対応する変更前と変更後のイメージも削除されます。

その他の考慮事項

  • 変更前と変更後のイメージを有効にすると、ストレージ容量が消費され、処理時間が増えます。変更前と変更後のイメージは、必要な場合のみ有効にしてください。

  • 変更ストリーム イベントのサイズを 16 メガバイト未満に制限します。イベントのサイズを制限するには、次の方法があります。

    • ドキュメントのサイズを 8 MB に制限します。updateDescription のような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、変更ストリーム出力で変更前と変更後のイメージを同時にリクエストできます。

    • updateDescription のような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、ドキュメントの変更ストリーム出力では最大 16 MB の変更後のイメージのみをリクエストしてください。

    • 次の場合、ドキュメントの変更ストリーム出力では最大 16 MB の変更前のイメージのみをリクエストしてください。

      • ドキュメントのアップデートがドキュメントの構造または内容のごく一部にしか影響しない場合、そして

      • replace 変更イベントが発生しない場合。replace イベントには、常に変更後のイメージが含まれます。

  • 変更前イメージをリクエストするには、db.collection.watch() で、fullDocumentBeforeChangerequired または whenAvailable に設定します。変更後イメージをリクエストするには、同じ方法で fullDocument を設定します。

  • 変更前のイメージは config.system.preimages コレクションに書き込まれます。

    • config.system.preimages コレクションが大きくなる場合があります。コレクションのサイズを制限するには、前述のとおり、変更前のイメージに expireAfterSeconds 時間を設定します。

    • 変更前のイメージはバックグラウンド プロセスによって非同期で削除されます。

重要

下位互換性のない機能

MongoDB 6.0 以降では、変更ストリームにドキュメントの変更前のイメージと変更後のイメージを使用している場合、以前の MongoDB バージョンにダウングレードする前に、collMod コマンドを使用して各コレクションの changeStreamPreAndPostImages を無効にする必要があります。

Tip

以下も参照してください。

変更ストリーム出力の完全な例については、「Change Streams とドキュメントの変更前イメージおよび変更後イメージ」を参照してください。

戻る

制限