データの変更を監視
Overview
このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
Scalaドライバーを使用する場合、watch()
メソッドを呼び出して ChangeStreamObservable
のインスタンスを返すことができます。その後、ChangeStreamObservable
インスタンスをサブスクライブして、アップデート、挿入、削除などの新しいデータ変更を確認できます。
サンプル データ
このガイドの例では、restaurants
sample_restaurants
Atlasサンプルデータセット の データベースの コレクションを使用します。 Scalaアプリケーションからこのコレクションにアクセスするには、AtlasMongoClient
クラスターに接続する を作成し、 変数と 変数に次の値を割り当てます。database
collection
val database: MongoDatabase = client.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Tip
MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、 「Atlas を使い始める」ガイドを参照してください。
一部の例では、変更ストリームイベントを処理するために LatchedObserver
クラスのインスタンスを使用しています。このクラスは、変更ストリームイベントを出力し、ストリームが完了またはエラーを生成するまでデータ変更の監視を続けるカスタム オブザーバーです。 LatchedObserver
クラス を使用するには、次のコードをアプリケーションファイルに貼り付けます。
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() }
変更ストリームを開く
変更ストリームを開くには、 watch()
メソッドを呼び出します。 watch()
メソッドを呼び出すインスタンスによって、変更ストリームが監視するイベントの範囲が決まります。 次のクラスのインスタンスでwatch()
メソッドを呼び出すことができます。
MongoClient
: 配置内のすべてのデータベースにわたるすべてのコレクションに対する変更を監視します。ただし、システム コレクションまたはadmin
、 、 データベースのコレクションは対象外です。local
config
MongoDatabase
: 1 つのデータベース内のすべてのコレクションに対する変更を監視MongoCollection
: 1 つのコレクションの変更を監視
次の例では、 watch()
メソッドを呼び出して、restaurants
コレクションの変更ストリームを開きます。このコードでは、変更が発生したときに受信して出力する LatchedObserver
インスタンスが作成されます。
val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await()
変更の監視を開始するには、上記のコードを実行します。次に、別のシェルで次のコードを実行して、name
フィールドの値が "Blarney Castle"
であるドキュメントを更新します。
val filter = equal("name", "Blarney Castle") val update = set("cuisine", "American") collection.updateOne(filter, update) .subscribe((res: UpdateResult) => println(res), (e: Throwable) => println(s"There was an error: $e"))
上記のコードを実行してコレクション を更新すると、変更ストリームアプリケーションは変更が発生に応じて出力します。出力される変更イベントは、次の出力のようになります。
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
変更ストリーム出力の変更
変更ストリーム出力を変更するには、パイプラインステージのリストをパラメーターとして watch()
メソッドに渡します。リストには、次のステージを含めることができます。
$addFields
または$set
: ドキュメントに新しいフィールドを追加します$match
: ドキュメントをフィルタリングします$project
:ドキュメントフィールドのサブセットをプロジェクションします$replaceWith
または$replaceRoot
: 入力ドキュメントを指定されたドキュメントで置き換え$redact
: ドキュメントのコンテンツを制限します$unset
: ドキュメントからフィールドを削除します
Scalaドライバーは Aggregates
クラスを提供します。このクラスには、前述のパイプラインステージを構築するためのヘルパーメソッドが含まれています。
Tip
次の例では、 Aggregates.filter()
メソッドを使用して $match
ステージを構築するパイプラインを作成します。次に、コードはこのパイプラインをwatch()
メソッドに渡し、アップデート操作が発生した場合にのみイベントを出力するように watch()
に指示します。
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update")))) observer.await()
watch() の動作を変更する
ChangeStreamObservable
クラスによって提供されるメソッドを連鎖させることで、watch()
メソッドの動作を変更できます。次の表では、これらの方法の一部について説明しています。
方式 | 説明 |
---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Attaches a comment to the operation. |
| Instructs the change stream to provide only changes that occurred at or after
the specified timestamp. |
| Sets the collation to use for the change stream cursor. |
watch()
オプションの完全なリストについては、 APIドキュメントの「 ChangeStreamObservable 」を参照してください。
変更前と変更後のイメージを含めます
重要
配置でMongoDB Server v6.0 以降が使用されている場合にのみ、コレクションの変更前イメージと変更後イメージを有効にできます。
デフォルトでは 、コレクションに対して操作を実行すると、対応する変更イベントには操作の前後で変更されたフィールドとその値のみが含まれます。
watch()
変更されたフィールドに加えて、ドキュメントの 変更前のイメージ 、変更前のドキュメントの完全なバージョンを返すよう、 メソッドに指示できます。変更ストリームイベントに変更前のイメージを含めるには、 メソッドをfullDocumentBeforeChange()
watch()
にチェーンします。次のいずれかの値をfullDocumentBeforeChange()
メソッドに渡します。
FullDocumentBeforeChange.WHEN_AVAILABLE
: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、この変更イベントフィールドの値はnull
になります。FullDocumentBeforeChange.REQUIRED
: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、サーバーはエラーを発生させます。
また、変更されたwatch()
フィールドに加えて、ドキュメントの 変更後のイメージ 、変更後のドキュメントの完全なバージョンを返すよう、 メソッドに指示することもできます。変更ストリームイベントに変更後のイメージを含めるには、 メソッドをfullDocument()
watch()
にチェーンします。次のいずれかの値をfullDocument()
メソッドに渡します。
FullDocument.UPDATE_LOOKUP
: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。FullDocument.WHEN_AVAILABLE
: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後イメージが利用できない場合、この変更イベントフィールドの値はnull
になります。FullDocument.REQUIRED
: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、サーバーはエラーを発生させます。
次の例では、コレクションで watch()
メソッドを呼び出し、fullDocument()
メソッドを連鎖させることで更新されたドキュメントの変更後のイメージを含めています。
val observer = LatchedObserver() collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .subscribe(observer) observer.await()
restaurants
別のシェルで実行中変更ストリームアプリケーションでは、前述の更新例を使用して コレクション内のドキュメントを更新すると、次の出力のような変更イベントが出力されます。
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24", "coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard", "zipcode": "11697"}), (borough,BsonString{value='Queens'}), (cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}), (name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}), (blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription= UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Tip
変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。
詳細情報
Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。