データの変更を監視
Overview
このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
サンプル データ
このガイドの例では、 Atlasサンプルデータセットの sample_restaurants.restaurants
コレクションを使用します。 無料のMongoDB Atlasクラスターを作成し、サンプルデータセットをロードする方法については、 の使用開始ガイドを参照してください。
重要
プロジェクトリ アクター ライブラリ
このガイドでは、プロジェクト Reactive ライブラリを使用して、Publisher
Java Reactive Streams ドライバー メソッドによって返された インスタンスを消費します。プロジェクト Reactor ライブラリとその使用方法の詳細については、React ドキュメントの「 使用開始 」 を参照してください。このガイドでは Project React ライブラリ メソッドをどのように使用しているかについて詳しくは、「 MongoDBへのデータの書込み 」ガイドを参照してください。
変更ストリームを開く
変更ストリームを開くには、 watch()
メソッドを呼び出します。 メソッドを呼び出すインスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 次のクラスのインスタンスでwatch()
メソッドを呼び出すことができます。
MongoClient
: MongoDB 配置のすべての変更を監視MongoDatabase
: データベース内のすべてのコレクションの変更を監視するにはMongoCollection
: コレクションの変更をモニターするには
次の例では、 restaurants
コレクションの変更ストリームを開き、変更が発生に応じて出力します。
// Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
変更の監視を開始するには、アプリケーションを実行します。 次に、別のアプリケーションまたはshellで、restaurants
コレクションに対して書込み操作を実行します。 "name"
フィールドの値が"Blarney
Castle"
であるドキュメントを更新すると、次の変更ストリーム出力が生成されます。
Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
変更ストリーム出力の変更
集計パイプラインをパラメーターとしてwatch()
メソッドに渡して、変更ストリーム出力を変更できます。 このパラメーターを使用すると、指定された変更イベントのみを監視できます。
pipeline
パラメーターでは次の集計ステージを指定できます。
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
次の例では、集計パイプラインを変更ストリームに渡して、アップデート操作のみをレコードします。
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。
watch()
の動作を変更する
変更ストリーム操作を構成するために使用できるオプションを表すwatch()
メソッドにメソッドを連鎖させることができます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません 。
次の表では、 watch()
に連鎖させて動作をカスタマイズできるメソッドについて説明します。
オプション | 説明 |
---|---|
fullDocument() | Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
fullDocumentBeforeChange() | Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
resumeAfter() | Directs watch() to resume returning changes after the
operation specified in the resume token.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.resumeAfter() is mutually exclusive with startAfter() and startAtOperationTime() . |
startAfter() | Directs watch() to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.startAfter() is mutually exclusive with resumeAfter() and startAtOperationTime() . |
startAtOperationTime() | Directs watch() to return only events that occur after the
specified timestamp.startAtOperationTime() is mutually exclusive with resumeAfter() and startAfter() . |
maxAwaitTime() | Specifies the maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds. |
showExpandedEvents() | Starting in MongoDB Server v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To
include expanded events in a change stream, call this method and pass
in the value, true . |
batchSize() | Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster. |
collation() | Specifies the collation to use for the change stream cursor. |
comment() | Attaches a comment to the operation. |
変更前と変更後のイメージを含めます
重要
配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。
デフォルトでは 、コレクションに対して操作を実行すると、対応する変更イベントには、その操作によって変更されたフィールドのデルタのみが含まれます。 変更前または変更後の完全なドキュメントを表示するには、 fullDocumentBeforeChange()
メソッドまたはfullDocument()
メソッドをwatch()
メソッドにチェーンします。
変更前のイメージは、変更前のドキュメントの完全なバージョンです。 変更ストリームイベントに変更前のイメージを含めるには、次のいずれかの値をfullDocumentBeforeChange()
メソッドに渡します。
FullDocumentBeforeChange.WHEN_AVAILABLE
: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。FullDocumentBeforeChange.REQUIRED
: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。
変更後のイメージとは、変更後のドキュメントの完全なバージョンです。 変更ストリームイベントに変更後のイメージを含めるには、次のいずれかの値をfullDocument()
メソッドに渡します。
FullDocument.UPDATE_LOOKUP
: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。FullDocument.WHEN_AVAILABLE
: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。FullDocument.REQUIRED
: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。
次の例では、コレクションの変更ストリームを開き、 fullDocument()
メソッドをwatch()
メソッドに連結して更新されたドキュメントの変更後のイメージを含めます。
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received including the full // document after the update ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。
詳細情報
Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。