オープンChange Streams
Overview
このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションが単一のコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB サーバーの機能です。 アプリケーションが受信したデータをフィルタリングして変換するための集計演算子のセットを指定できます。 MongoDB v6.0 以降に接続する場合、変更前と変更後のドキュメント データを含めるようにイベントを構成できます。
変更ストリームを開いて構成する方法については、次のセクションを参照してください。
変更ストリームを開く
変更ストリームを開いて、特定のタイプのデータ変更をサブスクライブし、アプリケーション内で変更イベントを生成できます。
変更ストリームを開くには、 MongoCollection
、 MongoDatabase
、またはMongoClient
のインスタンスで watch()
メソッドを呼び出します。
重要
スタンドアロンの MongoDB 配置では、この機能にはレプリカセットの oplog が必要なため、変更ストリームはサポートされていません。 oplogの詳細については、レプリカセットoplogサーバーのマニュアル ページを参照してください。
watch()
メソッドを呼び出すオブジェクトによって、変更ストリームがリッスンするイベントの範囲が決まります。
MongoCollection
でwatch()
を呼び出すと、変更ストリームによってコレクションが監視されます。
MongoDatabase
でwatch()
を呼び出すと、変更ストリームによってそのデータベース内のすべてのコレクションが監視されます。
MongoClient
でwatch()
を呼び出すと、変更ストリームによって接続された MongoDB 配置のすべての変更が監視されます。
例
次のコード例は、コレクションのデータが変更されるたびに変更ストリームを開き、変更ストリーム イベントを出力する方法を示しています。
// Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
コレクションに対して挿入操作を実行すると、次のテキストのような出力が生成されます。
Received a change event: ChangeStreamDocument{ operationType='insert', resumeToken={"_data": "825EC..."}, namespace=myDb.myChangeStreamCollection, ... }
実行可能な例については、「 変更監視 」の使用例ページを参照してください。
watch()
メソッドの詳細については、次の API ドキュメントを参照してください。
変更ストリームへの集計演算子の適用
集計パイプラインをパラメーターとしてwatch()
メソッドに渡して、変更ストリームが受信する変更イベントを指定できます。
MongoDB サーバーのバージョンがサポートする集計演算子については、「変更ストリーム出力の変更 」を参照してください。
例
次のコード例は、集計パイプラインを適用して、挿入操作とアップデート操作のみの変更イベントを受け取るように変更ストリームを構成する方法を示しています。
val pipeline = listOf( Aggregates.match(Filters.`in`("operationType", listOf("insert", "update"))) ) // Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
変更ストリームが 更新 変更イベントを受信すると、上記のコード例では次のテキストが出力されます。
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, ...
大規模な変更ストリーム イベントの分裂
MongoDB v7.0 以降に接続する場合、 $changeStreamSplitLargeEvent
集計演算子を使用して 16 MB を超えるイベント ドキュメントを小さなフラグメントに分割できます。
$changeStreamSplitLargeEvent
演算子は、変更ストリーム イベントがドキュメント サイズの制限を超えると予想される場合にのみ使用してください。 例えば、アプリケーションで完全なドキュメントの変更前イメージまたは変更後イメージが必要な場合は、この機能を使用できます。
$changeStreamSplitLargeEvent
集計ステージは フラグメントを順番に返します。 変更ストリーム カーソル を使用してフラグメントにアクセスできます。 各フラグメント ドキュメントには、次のフィールドを含むsplitEvent
オブジェクトが含まれています。
フィールド | 説明 |
---|---|
fragment | フラグメントのインデックス(開始) 1 |
of | 分裂イベントを構成するフラグメントの合計数 |
次の例では、大規模なイベントを分割するために、 $changeStreamSplitLargeEvent
集計ステージを持つ集計パイプラインを含む変更ストリームを開きます。
val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument())) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
注意
集計パイプラインには$changeStreamSplitLargeEvent
ステージを 1 つだけ含めることができ、パイプラインの最後のステージである必要があります。
$changeStreamSplitLargeEvent
集計演算子の詳細については、サーバー マニュアルの$changeStreamSplitLargeEvent(集計)を参照してください。
変更前と変更後のイメージを含みます
変更イベントは、以下のデータを含めるか省略するように構成できます。
操作前のドキュメントのバージョンを表すドキュメント(存在する場合)
操作後のドキュメントのバージョンを表すドキュメント(存在する場合)
変更前イメージまたは変更後イメージを含む変更ストリーム イベントを受信するには、MongoDB v6.0 以降の配置に接続し、次を設定する必要があります。
MongoDB 配置のコレクションの変更前と変更後のイメージを有効にします。
Tip
配置でこれらを有効にする方法については、「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」MongoDB サーバーのマニュアル ページを参照してください。
変更前と変更後のイメージが有効になっているコレクションを作成するようにドライバーに指示する方法については、「 変更前と変更後のイメージが有効になっているコレクションの作成 」セクションを参照してください。
変更ストリームを設定して、変更前のイメージと変更後のイメージのどちらかを取得します。
Tip
変更前のイメージを含むように変更ストリームを構成する方法については、「 変更前のイメージの構成例 」を参照してください。
変更後のイメージを含むように変更ストリームを構成するには、「 変更後のイメージの構成例 」を参照してください。
変更前と変更後のイメージが有効になっているコレクションの作成
ドライバーを使用して 変更前と変更後のイメージ オプションでコレクションを作成するには、次の例に示すように、 ChangeStreamPreAndPostImagesOptions
のインスタンスを指定し、 createCollection()
メソッドを呼び出します。
val collectionOptions = CreateCollectionOptions() collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true)) database.createCollection("myChangeStreamCollection", collectionOptions)
MongoDB Shell からcollMod
コマンドを実行すると、既存のコレクションの変更前イメージと変更後イメージ オプションを変更できます。 この操作の実行方法については、 collModサーバーのマニュアル ドキュメントを参照してください。
警告
コレクションでこのオプションを変更すると、変更前イメージまたは変更後イメージの受信を要求するように構成されている場合、アプリケーション内のそのコレクションに対して実行される変更ストリームが失敗する可能性があります。
変更前のイメージの構成例
次のコード例は、変更ストリームを構成して、変更前のイメージを含め、結果を出力する方法を示しています。
val job = launch { val changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
上記の例では、変更ストリームがFullDocumentBeforeChange.REQUIRED
オプションを使用するように構成されています。 これにより、置換、アップデート、削除される変更イベントの変更前のイメージを返し、変更前のイメージが利用できない場合にサーバーがエラーを発生させるように変更ストリームが構成されます。
ソフトウェア ライブラリの依存関係のコレクション内のドキュメントのlatestVersion
フィールドを2.0.0
の値から2.1.0
に更新したとします。 上記のコード例による対応する変更イベントの出力は、次のテキストのようになります。
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...} namespace=software.libraries, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}}, ...
オプションのリストについては、次を参照して ください API ドキュメント。
変更後のイメージの構成例
次のコード例は、変更ストリームを構成して変更後のイメージを含め、結果を出力する方法を示しています。
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
上記の例では、変更ストリームがFullDocument.UPDATE_LOOKUP
オプションを使用するように構成されています。 これにより、元のドキュメントと変更されたドキュメント間のデルタと、変更が発生した後の特定の時点でのドキュメントのコピーの両方を返すように変更ストリームが構成されます。
アプリケーションが、ドキュメントのpopulation
フィールドを800
の値から950
にアップデートしたとします。 上記のコード例による対応する変更イベントの出力は、次のテキストのようになります。
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, namespace=censusData.cities, destinationNamespace=null, fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}}, updatedFields={"population": 950}, ... ...
オプションのリストについては、 FullDocument を参照してください API ドキュメント。