Docs Menu
Docs Home
/ / /
Java 同期
/

変更の監視

変更ストリームを開くと、コレクション、データベース、または配置への変更など、MongoDB のデータに対する変更を追跡できます。 変更ストリームを使用すると、アプリケーションはデータの変更をReactし、それに対応できます。

変更ストリームは、変更が発生したときに変更イベントドキュメントを返します。 変更イベントにはアップデートされたデータに関する情報が含まれます。

次のコード例に示すように、 MongoCollectionMongoDatabase 、またはMongoClientオブジェクトで watch()メソッドを呼び出して変更ストリームを開きます。

ChangeStreamIterable<Document> changeStream = database.watch();

watch()メソッドは、変更イベント出力を次のようにフィルタリングして変換する最初のパラメーターとして、ステージの配列で構成される集計パイプラインをオプションで使用します。

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

watch()メソッドは、結果にアクセスし、整理し、走査するためのいくつかのメソッドを提供するクラスであるChangeStreamIterableのインスタンスを返します。 ChangeStreamIterableは、コア Java インターフェースIterableを実装する親クラスであるMongoIterableからメソッドも継承します。

ChangeStreamIterableforEach()を呼び出して、イベントが発生したときに処理するか、結果を走査するために使用できるMongoCursorインスタンスを返すiterator()メソッドを使用できます。

追加の結果が存在するかどうかを確認するには、 hasNext() MongoCursor上のメソッドを呼び出します。 next()ではコレクション内の次のドキュメントが返され、 tryNext()では変更ストリーム内の次の利用可能な要素がすぐに返されます。またはnull 。 他のクエリによって返されるMongoCursorとは異なり、変更ストリームに関連付けられたMongoCursorは変更イベントが到達するまで待機してから、 next()から結果を返します。 その結果、変更ストリームのMongoCursorを使用してnext()を呼び出した場合、 java.util.NoSuchElementExceptionはスローされません。

変更ストリームから返されたドキュメントを処理するためのオプションを構成するには、 watch()によって返されるChangeStreamIterableオブジェクトのメンバー メソッドを使用します。 利用可能なメソッドの詳細については、この例の下部にあるChangeStreamIterable API ドキュメントへのリンクを参照してください。

変更ストリームからイベントをキャプチャするには、以下に示すようにコールバック関数を使用してforEach()メソッドを呼び出します。

changeStream.forEach(event -> System.out.println("Change observed: " + event));

コールバック関数は、変更イベントが発生したときにトリガーされます。 イベント ドキュメントを受信時に処理するよう、コールバック内でロジックを指定できます。

重要

forEach() は現在のスレッドをブロックします

forEach()への呼び出しは、対応する変更ストリームがイベントをリッスンしている限り、現在のスレッドをブロックします。 リクエストの処理やユーザー入力への応答など、他のロジックをプログラムで実行し続ける必要がある場合は、別のスレッドで変更ストリームを作成してリッスンすることを検討してください。

注意

アップデート操作変更イベントの場合、変更ストリームはデフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。 次のように、値FullDocument.UPDATE_LOOKUPを持つChangeStreamIterableオブジェクトのfullDocument()メンバー メソッドを呼び出すことで、ドキュメントの最新バージョンも返すように変更ストリームを構成できます。

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

次の例では、2 つの個別のアプリケーションを使用して、変更ストリームを使用して変更をリッスンする方法を示しています。

  • Watchという名前の最初のアプリケーションは、 sample_mflixデータベース内のmoviesコレクションの変更ストリームを開きます。 Watchは集計パイプラインを使用して、 operationTypeに基づいて変更をフィルタリングし、挿入と更新イベントのみを受け取るようにします(削除は省略によって除外されます)。 Watchは、コールバックを使用して、コレクションで発生するフィルタリングされた変更イベントを受信して出力します。

  • WatchCompanionという名前の 2 番目のアプリケーションは、 sample_mflixデータベース内のmoviesコレクションに単一のドキュメントを挿入します。 次に、 WatchCompanionは新しいフィールド値でドキュメントをアップデートします。 最後に、 WatchCompanionはドキュメントを削除します。

まず、 Watchを実行してコレクションの変更ストリームを開き、 forEach()メソッドを使用して変更ストリーム上でコールバックを定義します。 Watchの実行中にWatchCompanionを実行し、コレクションに対する変更を実行し、変更イベントを生成します。

注意

この例では、接続 URI を使用して MongoDB のインスタンスに接続します。 MongoDB インスタンスへの接続の詳細については、「 接続ガイド 」を参照してください。

Watch:

package usage.examples;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// variables referenced in a lambda must be final; final array gives us a mutable integer
final int[] numberOfEvents = {0};
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

上記のアプリケーションを順番に実行すると、次のようなWatchアプリケーションからの出力が表示されます。 insertupdate集計パイプラインは 操作をフィルタリングで除外するため、delete 操作と 操作のみが出力されます。

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

また、次のようなWatchCompanionアプリケーションからの出力も表示されます。

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Tip

Legacy API

レガシー API を使用している場合は、 FAQ ページ を参照して、このコード例に加える必要がある変更を確認してください。

このページで言及されているクラスとメソッドについての追加情報については、次のリソースを参照してください。

  • Change Streams Server の手動入力

  • 変更イベントサーバー マニュアル エントリ

  • 集計パイプラインサーバーのマニュアルエントリ

  • 集計ステージサーバー マニュアル エントリ

  • ChangeStreamIterable APIドキュメント

  • MongoCollection.watch() API ドキュメント

  • MongoDatabase.watch() API ドキュメント

  • MongoClient.watch()API ドキュメント

戻る

一括操作