変更ストリーム
MongoDB Server バージョン3.6では、 $changeStream
集計パイプライン演算子が導入されています。
変更ストリームは、コレクション内のドキュメントに対する変更を監視する方法を提供します。 この新しいステージの使いやすさを向上させるため、 MongoCollection
タイプにはwatch()
メソッドが含まれています。 ChangeStreamObservable
インスタンスは変更ストリームを設定し、回復可能なエラーが発生した場合は自動的に再開を試みます。
前提条件
このガイドのコード例を実行するには、次のコンポーネントを設定する必要があります。
のGithub
test.restaurants
ドキュメントrestaurants.json
アセットの ファイルのドキュメントを入力する コレクション。次のインポート ステートメントは次のとおりです。
import java.util.concurrent.CountDownLatch import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.changestream._
注意
このガイドでは、 クイック スタート プライマリで説明されているObservable
暗黙を使用します。
MongoDB 配置への接続
まず、MongoDB 配置に接続し、 インスタンスとMongoDatabase
MongoCollection
インスタンスを 宣言して定義します。
次のコードは、ポート27017
のlocalhost
で実行されているスタンドアロンの MongoDB 配置に接続します。 次に、 test
データベースを参照するためのdatabase
変数と、 restaurants
コレクションを参照するためのcollection
変数を定義します。
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
MongoDB 配置への接続の詳細については、「 MongoDB への接続」チュートリアルを参照してください。
コレクションの変更の監視
変更ストリームを作成するには、 MongoCollection.watch()
メソッドのいずれかを使用します。
次の例では、変更ストリームは、観察されたすべての変更を出力します。
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() } val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await() // Block waiting for the latch
データベースの変更の監視
アプリケーションは 1 つの変更ストリームを開いて、データベースのすべての非システム コレクションを監視できます。 このような変更ストリームを作成するには、 MongoDatabase.watch()
メソッドのいずれかを使用します。
次の例では、変更ストリームは、指定されたデータベースで観察されたすべての変更を出力します。
val observer = LatchedObserver() database.watch().subscribe(observer) observer.await() // Block waiting for the latch
すべてのデータベースでの変更の監視
アプリケーションは 1 つの変更ストリームを開いて、MongoDB 配置内のすべてのデータベースのすべての非システム コレクションを監視できます。 このような変更ストリームを作成するには、 MongoClient.watch()
メソッドのいずれかを使用します。
次の例では、変更ストリームは、 MongoClient
が接続されている配置で観察されたすべての変更を出力します。
val observer = LatchedObserver() client.watch().subscribe(observer) observer.await() // Block waiting for the latch
コンテンツのフィルタリング
集計ステージのリストをwatch()
メソッドに渡して、 $changeStream
演算子によって返されるデータを変更できます。
注意
すべての集計演算子がサポートされているわけではありません。 詳細については、サーバー マニュアルの「 Change Streams 」を参照してください。
次の例では、変更ストリームは、 insert
、 update
、 replace
、 delete
の操作に対応するために観察されたすべての変更を出力します。
まず、パイプラインにはoperationType
がinsert
、 update
、 replace
、またはdelete
のいずれかであるドキュメントをフィルタリングする$match
ステージが含まれています。 次に、 fullDocument
をFullDocument.UPDATE_LOOKUP
に設定し、更新後のドキュメントが結果に含まれるようにします。
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete"))))) .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer) observer.await() // Block waiting for the latch