オープンChange Streams
Overview
このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションが単一のコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
アプリケーションが受信したデータをフィルタリングして変換するための集計演算子のセットを指定できます。 MongoDB 配置 v6.0 以降に接続する場合は、変更前と変更後のドキュメント データを含めるようにイベントを構成することもできます。
変更ストリームを開いて構成する方法については、次のセクションを参照してください。
変更ストリームを開く
変更ストリームを開いて、特定のタイプのデータ変更をサブスクライブし、アプリケーション内で変更イベントを生成できます。
変更ストリームを開くには、 MongoCollection
、 MongoDatabase
、またはMongoClient
のインスタンスで watch()
メソッドを呼び出します。
重要
スタンドアロンの MongoDB 配置では、この機能にはレプリカセットの oplog が必要なため、変更ストリームはサポートされていません。 oplogの詳細については、レプリカセットoplog MongoDB Serverのマニュアル ページ を参照してください。
watch()
メソッドを呼び出すオブジェクトによって、変更ストリームがリッスンするイベントの範囲が決まります。
MongoCollection
でwatch()
を呼び出すと、変更ストリームによってコレクションが監視されます。
MongoDatabase
でwatch()
を呼び出すと、変更ストリームによってそのデータベース内のすべてのコレクションが監視されます。
MongoClient
でwatch()
を呼び出すと、変更ストリームによって接続された MongoDB 配置のすべての変更が監視されます。
例
この例では、 myColl
コレクションで変更ストリームを開き、変更ストリーム イベントを発生に応じて出力する方法を示します。
ドライバーは変更ストリーム イベントをChangeStreamIterable
型の変数に保存します。 次の例では、ドライバーがChangeStreamIterable
オブジェクトにDocument
型を入力することを指定します。 その結果、ドライバーは個々の変更ストリーム イベントをChangeStreamDocument
オブジェクトとして保存します。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
コレクションに対して挿入操作を実行すると、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
実行可能な例については、「 変更監視 」の使用例ページを参照してください。
watch()
メソッドの詳細については、次の API ドキュメントを参照してください。
変更ストリームへの集計演算子の適用
集計パイプラインをパラメーターとしてwatch()
メソッドに渡して、変更ストリームが受信する変更イベントを指定できます。
MongoDB Server のバージョンがサポートする集計演算子については、「変更ストリーム出力の変更 」を参照してください。
例
次のコード例は、集計パイプラインを適用して、挿入操作とアップデート操作のみの変更イベントを受け取るように変更ストリームを構成する方法を示しています。
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
コレクションに対してアップデート操作を実行すると、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
大規模な変更ストリーム イベントの分裂
MongoDB 7.0 以降では、 $changeStreamSplitLargeEvent
集計ステージを使用して 16 MB を超えるイベントを小さなフラグメントに分割できます。
厳密に必要な場合にのみ$changeStreamSplitLargeEvent
を使用してください。 たとえば、アプリケーションで完全なドキュメントの変更前または変更後のイメージが必要で、16 MB を超えるイベントが生成される場合は、 $changeStreamSplitLargeEvent
を使用します。
$changeStreamSplitLargeEvent ステージはフラグメントを順番に返します。 変更ストリーム カーソル を使用してフラグメントにアクセスできます。 各フラグメントには、次のフィールドを含むSplitEvent
オブジェクトが含まれます。
フィールド | 説明 |
---|---|
| フラグメントのインデックス(開始) |
| 分裂イベントを構成するフラグメントの合計数 |
次の例では、 $changeStreamSplitLargeEvent
集計ステージを使用して大規模なイベントを分割し、変更ストリームを変更します。
ChangeStreamIterable<Document> changeStream = collection.watch( Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
注意
集計パイプラインには$changeStreamSplitLargeEvent
ステージを 1 つだけ含めることができ、パイプラインの最後のステージである必要があります。
次の例に示すように、変更ストリーム カーソルでgetSplitEvent()
メソッドを呼び出してSplitEvent
にアクセスできます。
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
$changeStreamSplitLargeEvent
集計ステージの詳細については、 $changeStreamSplitLargetサーバーのドキュメントを参照してください。
変更前と変更後のイメージを含みます
変更イベントは、以下のデータを含めるか省略するように構成できます。
変更前のイメージ: 操作前のドキュメントのバージョンを表すドキュメント(存在する場合)
変更後のイメージ: 操作後のドキュメントのバージョンを表すドキュメント(存在する場合)
重要
配置で MongoDB v6.0 以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。
変更前イメージまたは変更後イメージを含む変更ストリーム イベントを受信するには、次のアクションを実行する必要があります。
MongoDB 配置のコレクションの変更前と変更後のイメージを有効にします。
Tip
配置で変更前と変更後のイメージを有効にする方法については、サーバー マニュアルの「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」を参照してください。
変更前と変更後のイメージが有効になっているコレクションを作成するようにドライバーに指示する方法については、「 変更前と変更後のイメージが有効になっているコレクションの作成 」セクションを参照してください。
変更ストリームを設定して、変更前のイメージと変更後のイメージのどちらかを取得します。
Tip
変更イベントに変更前のイメージを記録するように変更ストリームを構成する方法について詳しくは、「 変更前イメージの構成例 」を参照してください。
変更イベントで変更後のイメージを記録するように変更ストリームを構成するには、「 変更後のイメージの構成例 」を参照してください。
変更前と変更後のイメージが有効になっているコレクションの作成
ドライバーを使用して、変更前イメージと変更後イメージ オプションが有効になっているコレクションを作成するには、次の例に示すように、 ChangeStreamPreAndPostImagesOptions
のインスタンスを指定し、 createCollection()
メソッドを呼び出します。
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
MongoDB Shell からcollMod
コマンドを実行すると、既存のコレクションの変更前イメージと変更後イメージ オプションを変更できます。 この操作を実行する方法については、サーバー マニュアルのcollModに関するエントリを参照してください。
警告
コレクションで変更前イメージまたは変更後イメージを有効にした場合、 collMod
を使用してこれらの設定を変更すると、そのコレクションの既存の変更ストリームが失敗する可能性があります。
変更前のイメージの構成例
次のコード例は、 myColl
コレクションの変更ストリームを構成して、変更前のイメージを含め、変更イベントを出力する方法を示しています。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
上記の例では、変更ストリームがFullDocumentBeforeChange.REQUIRED
オプションを使用するように構成されています。 このオプションは、置換、アップデート、削除する変更イベントに変更前のイメージを要求するように変更ストリームを構成します。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。
ドキュメントのamount
フィールドの値を150
から2000
に更新するとします。 この変更イベントにより、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
オプションのリストについては、次を参照して ください API ドキュメント。
変更後のイメージの構成例
次のコード例は、 myColl
コレクションの変更ストリームを構成して、変更前のイメージを含め、変更イベントを出力する方法を示しています。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
上記の例では、変更ストリームがFullDocument.WHEN_AVAILABLE
オプションを使用するように構成されています。 このオプションは、使用可能な場合、置換および更新の変更されたドキュメントの変更後のイメージを返すように変更ストリームを構成します。
ドキュメントのcolor
フィールドの値を"purple"
から"pink"
に更新するとします。 変更イベントによって、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
オプションのリストについては、 FullDocument を参照してください API ドキュメント。