変更の監視
変更ストリームを開くと、コレクション、データベース、または配置への変更など、MongoDB のデータに対する変更を追跡できます。 変更ストリームを使用すると、アプリケーションはデータの変更をReactし、それに対応できます。
変更ストリームは、変更が発生したときに変更イベントドキュメントを返します。 変更イベントにはアップデートされたデータに関する情報が含まれます。
次のコード例に示すように、 MongoCollection
、 MongoDatabase
、またはMongoClient
オブジェクトで watch()
メソッドを呼び出して変更ストリームを開きます。
ChangeStreamIterable<Document> changeStream = database.watch();
watch()
メソッドは、変更イベント出力を次のようにフィルタリングして変換する最初のパラメーターとして、ステージの配列で構成される集計パイプラインをオプションで使用します。
List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
watch()
メソッドは、結果にアクセスし、整理し、走査するためのいくつかのメソッドを提供するクラスであるChangeStreamIterable
のインスタンスを返します。 ChangeStreamIterable
は、コア Java インターフェースIterable
を実装する親クラスであるMongoIterable
からメソッドも継承します。
ChangeStreamIterable
でforEach()
を呼び出して、イベントが発生したときに処理するか、結果を走査するために使用できるMongoCursor
インスタンスを返すiterator()
メソッドを使用できます。
追加の結果が存在するかどうかを確認するには、 hasNext()
MongoCursor
上のメソッドを呼び出します。 next()
ではコレクション内の次のドキュメントが返され、 tryNext()
では変更ストリーム内の次の利用可能な要素がすぐに返されます。またはnull
。 他のクエリによって返されるMongoCursor
とは異なり、変更ストリームに関連付けられたMongoCursor
は変更イベントが到達するまで待機してから、 next()
から結果を返します。 その結果、変更ストリームのMongoCursor
を使用してnext()
を呼び出した場合、 java.util.NoSuchElementException
はスローされません。
変更ストリームから返されたドキュメントを処理するためのオプションを構成するには、 watch()
によって返されるChangeStreamIterable
オブジェクトのメンバー メソッドを使用します。 利用可能なメソッドの詳細については、この例の下部にあるChangeStreamIterable
API ドキュメントへのリンクを参照してください。
コールバックを使用して変更ストリーム イベントを処理する方法
変更ストリームからイベントをキャプチャするには、以下に示すようにコールバック関数を使用してforEach()
メソッドを呼び出します。
changeStream.forEach(event -> System.out.println("Change observed: " + event));
コールバック関数は、変更イベントが発生したときにトリガーされます。 イベント ドキュメントを受信時に処理するよう、コールバック内でロジックを指定できます。
重要
forEach() は現在のスレッドをブロックします
forEach()
への呼び出しは、対応する変更ストリームがイベントをリッスンしている限り、現在のスレッドをブロックします。 リクエストの処理やユーザー入力への応答など、他のロジックをプログラムで実行し続ける必要がある場合は、別のスレッドで変更ストリームを作成してリッスンすることを検討してください。
注意
アップデート操作変更イベントの場合、変更ストリームはデフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。 次のように、値FullDocument.UPDATE_LOOKUP
を持つChangeStreamIterable
オブジェクトのfullDocument()
メンバー メソッドを呼び出すことで、ドキュメントの最新バージョンも返すように変更ストリームを構成できます。
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
例
次の例では、2 つの個別のアプリケーションを使用して、変更ストリームを使用して変更をリッスンする方法を示しています。
Watch
という名前の最初のアプリケーションは、sample_mflix
データベース内のmovies
コレクションの変更ストリームを開きます。Watch
は集計パイプラインを使用して、operationType
に基づいて変更をフィルタリングし、挿入と更新イベントのみを受け取るようにします(削除は省略によって除外されます)。Watch
は、コールバックを使用して、コレクションで発生するフィルタリングされた変更イベントを受信して出力します。WatchCompanion
という名前の 2 番目のアプリケーションは、sample_mflix
データベース内のmovies
コレクションに単一のドキュメントを挿入します。 次に、WatchCompanion
は新しいフィールド値でドキュメントをアップデートします。 最後に、WatchCompanion
はドキュメントを削除します。
まず、 Watch
を実行してコレクションの変更ストリームを開き、 forEach()
メソッドを使用して変更ストリーム上でコールバックを定義します。 Watch
の実行中にWatchCompanion
を実行し、コレクションに対する変更を実行し、変更イベントを生成します。
注意
この例では、接続 URI を使用して MongoDB のインスタンスに接続します。 MongoDB インスタンスへの接続の詳細については、「 接続ガイド 」を参照してください。
Watch
:
package usage.examples; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); // variables referenced in a lambda must be final; final array gives us a mutable integer final int[] numberOfEvents = {0}; changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion
:
package usage.examples; import java.util.Arrays; import org.bson.Document; import org.bson.types.ObjectId; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Success! Inserted document id: " + insertResult.getInsertedId()); UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
上記のアプリケーションを順番に実行すると、次のようなWatch
アプリケーションからの出力が表示されます。 insert
update
集計パイプラインは 操作をフィルタリングで除外するため、delete
操作と 操作のみが出力されます。
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
また、次のようなWatchCompanion
アプリケーションからの出力も表示されます。
Success! Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
このページで言及されているクラスとメソッドについての追加情報については、次のリソースを参照してください。
Change Streams Server の手動入力
変更イベントサーバー マニュアル エントリ
集計パイプラインサーバーのマニュアルエントリ
集計ステージサーバー マニュアル エントリ
ChangeStreamIterable API ドキュメント
MongoCollection.watch() API ドキュメント
MongoDatabase.watch() API ドキュメント
MongoClient.watch()API ドキュメント