Docs Menu
Docs Home
/ / /
Java Sync Driver
/ / /

オープンChange Streams

項目一覧

  • Overview
  • 変更ストリームを開く
  • 変更ストリームへの集計演算子の適用
  • 大規模な変更ストリーム イベントの分裂
  • 変更前と変更後のイメージを含みます

このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションが単一のコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。

アプリケーションが受信したデータをフィルタリングして変換するための集計演算子のセットを指定できます。 MongoDB 配置 v6.0 以降に接続する場合は、変更前と変更後のドキュメント データを含めるようにイベントを構成することもできます。

変更ストリームを開いて構成する方法については、次のセクションを参照してください。

  • 変更ストリームを開く

  • 変更ストリームへの集計演算子の適用

  • 変更前と変更後のイメージを含みます

変更ストリームを開いて、特定のタイプのデータ変更をサブスクライブし、アプリケーション内で変更イベントを生成できます。

変更ストリームを開くには、 MongoCollectionMongoDatabase 、またはMongoClientのインスタンスで watch()メソッドを呼び出します。

重要

スタンドアロンの MongoDB 配置では、この機能にはレプリカセットの oplog が必要なため、変更ストリームはサポートされていません。 oplogの詳細については、レプリカセットoplog MongoDB Serverのマニュアル ページ を参照してください。

watch()メソッドを呼び出すオブジェクトによって、変更ストリームがリッスンするイベントの範囲が決まります。

MongoCollectionwatch()を呼び出すと、変更ストリームによってコレクションが監視されます。

MongoDatabasewatch()を呼び出すと、変更ストリームによってそのデータベース内のすべてのコレクションが監視されます。

MongoClientwatch()を呼び出すと、変更ストリームによって接続された MongoDB 配置のすべての変更が監視されます。

この例では、 myCollコレクションで変更ストリームを開き、変更ストリーム イベントを発生に応じて出力する方法を示します。

ドライバーは変更ストリーム イベントをChangeStreamIterable型の変数に保存します。 次の例では、ドライバーがChangeStreamIterableオブジェクトにDocument型を入力することを指定します。 その結果、ドライバーは個々の変更ストリーム イベントをChangeStreamDocumentオブジェクトとして保存します。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

コレクションに対して挿入操作を実行すると、次の出力が生成されます。

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

実行可能な例については、「 変更監視 」の使用例ページを参照してください。

watch()メソッドの詳細については、次の API ドキュメントを参照してください。

集計パイプラインをパラメーターとしてwatch()メソッドに渡して、変更ストリームが受信する変更イベントを指定できます。

MongoDB Server のバージョンがサポートする集計演算子については、「変更ストリーム出力の変更 」を参照してください。

次のコード例は、集計パイプラインを適用して、挿入操作とアップデート操作のみの変更イベントを受け取るように変更ストリームを構成する方法を示しています。

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

コレクションに対してアップデート操作を実行すると、次の出力が生成されます。

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

MongoDB 7.0 以降では、 $changeStreamSplitLargeEvent集計ステージを使用して 16 MB を超えるイベントを小さなフラグメントに分割できます。

厳密に必要な場合にのみ$changeStreamSplitLargeEventを使用してください。 たとえば、アプリケーションで完全なドキュメントの変更前または変更後のイメージが必要で、16 MB を超えるイベントが生成される場合は、 $changeStreamSplitLargeEventを使用します。

$changeStreamSplitLargeEvent ステージはフラグメントを順番に返します。 変更ストリーム カーソル を使用してフラグメントにアクセスできます。 各フラグメントには、次のフィールドを含むSplitEventオブジェクトが含まれます。

フィールド
説明

fragment

フラグメントのインデックス(開始) 1

of

分裂イベントを構成するフラグメントの合計数

次の例では、 $changeStreamSplitLargeEvent集計ステージを使用して大規模なイベントを分割し、変更ストリームを変更します。

ChangeStreamIterable<Document> changeStream = collection.watch(
Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

注意

集計パイプラインには$changeStreamSplitLargeEventステージを 1 つだけ含めることができ、パイプラインの最後のステージである必要があります。

次の例に示すように、変更ストリーム カーソルでgetSplitEvent()メソッドを呼び出してSplitEventにアクセスできます。

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

$changeStreamSplitLargeEvent集計ステージの詳細については、 $changeStreamSplitLargetサーバーのドキュメントを参照してください。

変更イベントは、以下のデータを含めるか省略するように構成できます。

  • 変更前のイメージ: 操作前のドキュメントのバージョンを表すドキュメント(存在する場合)

  • 変更後のイメージ: 操作後のドキュメントのバージョンを表すドキュメント(存在する場合)

重要

配置で MongoDB v6.0 以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。

変更前イメージまたは変更後イメージを含む変更ストリーム イベントを受信するには、次のアクションを実行する必要があります。

  • MongoDB 配置のコレクションの変更前と変更後のイメージを有効にします。

    Tip

    配置で変更前と変更後のイメージを有効にする方法については、サーバー マニュアルの「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」を参照してください。

    変更前と変更後のイメージが有効になっているコレクションを作成するようにドライバーに指示する方法については、「 変更前と変更後のイメージが有効になっているコレクションの作成 」セクションを参照してください。

  • 変更ストリームを設定して、変更前のイメージと変更後のイメージのどちらかを取得します。

    Tip

    変更イベントに変更前のイメージを記録するように変更ストリームを構成する方法について詳しくは、「 変更前イメージの構成例 」を参照してください。

    変更イベントで変更後のイメージを記録するように変更ストリームを構成するには、「 変更後のイメージの構成例 」を参照してください。

ドライバーを使用して、変更前イメージと変更後イメージ オプションが有効になっているコレクションを作成するには、次の例に示すように、 ChangeStreamPreAndPostImagesOptionsのインスタンスを指定し、 createCollection()メソッドを呼び出します。

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

MongoDB Shell からcollModコマンドを実行すると、既存のコレクションの変更前イメージと変更後イメージ オプションを変更できます。 この操作を実行する方法については、サーバー マニュアルのcollModに関するエントリを参照してください。

警告

コレクションで変更前イメージまたは変更後イメージを有効にした場合、 collModを使用してこれらの設定を変更すると、そのコレクションの既存の変更ストリームが失敗する可能性があります。

次のコード例は、 myCollコレクションの変更ストリームを構成して、変更前のイメージを含め、変更イベントを出力する方法を示しています。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

上記の例では、変更ストリームがFullDocumentBeforeChange.REQUIREDオプションを使用するように構成されています。 このオプションは、置換、アップデート、削除する変更イベントに変更前のイメージを要求するように変更ストリームを構成します。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。

ドキュメントのamountフィールドの値を150から2000に更新するとします。 この変更イベントにより、次の出力が生成されます。

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

オプションのリストについては、次を参照して ください API ドキュメント。

次のコード例は、 myCollコレクションの変更ストリームを構成して、変更前のイメージを含め、変更イベントを出力する方法を示しています。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

上記の例では、変更ストリームがFullDocument.WHEN_AVAILABLEオプションを使用するように構成されています。 このオプションは、使用可能な場合、置換および更新の変更されたドキュメントの変更後のイメージを返すように変更ストリームを構成します。

ドキュメントのcolorフィールドの値を"purple"から"pink"に更新するとします。 変更イベントによって、次の出力が生成されます。

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

オプションのリストについては、 FullDocument を参照してください API ドキュメント。

戻る

カーソルからのデータ