Docs Menu
Docs Home
/ / /
Scala
/

変更ストリーム

項目一覧

  • 前提条件
  • MongoDB 配置への接続
  • コレクションの変更の監視
  • データベースの変更の監視
  • すべてのデータベースでの変更の監視
  • コンテンツのフィルタリング

MongoDB Server バージョン3.6では、 $changeStream集計パイプライン演算子が導入されています。

変更ストリームは、コレクション内のドキュメントに対する変更を監視する方法を提供します。 この新しいステージの使いやすさを向上させるため、 MongoCollectionタイプにはwatch()メソッドが含まれています。 ChangeStreamObservableインスタンスは変更ストリームを設定し、回復可能なエラーが発生した場合は自動的に再開を試みます。

このガイドのコード例を実行するには、次のコンポーネントを設定する必要があります。

  • のGithub test.restaurantsドキュメントrestaurants.json アセットの ファイルのドキュメントを入力する コレクション。

  • 次のインポート ステートメントは次のとおりです。

import java.util.concurrent.CountDownLatch
import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.changestream._

注意

このガイドでは、 クイック スタート プライマリで説明されているObservable暗黙を使用します。

まず、MongoDB 配置に接続し、 インスタンスとMongoDatabase MongoCollectionインスタンスを 宣言して定義します。

次のコードは、ポート27017localhostで実行されているスタンドアロンの MongoDB 配置に接続します。 次に、 testデータベースを参照するためのdatabase変数と、 restaurantsコレクションを参照するためのcollection変数を定義します。

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

MongoDB 配置への接続の詳細については、「 MongoDB への接続」チュートリアルを参照してください。

変更ストリームを作成するには、 MongoCollection.watch()メソッドのいずれかを使用します。

次の例では、変更ストリームは、観察されたすべての変更を出力します。

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}
val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await() // Block waiting for the latch

アプリケーションは 1 つの変更ストリームを開いて、データベースのすべての非システム コレクションを監視できます。 このような変更ストリームを作成するには、 MongoDatabase.watch()メソッドのいずれかを使用します。

次の例では、変更ストリームは、指定されたデータベースで観察されたすべての変更を出力します。

val observer = LatchedObserver()
database.watch().subscribe(observer)
observer.await() // Block waiting for the latch

アプリケーションは 1 つの変更ストリームを開いて、MongoDB 配置内のすべてのデータベースのすべての非システム コレクションを監視できます。 このような変更ストリームを作成するには、 MongoClient.watch()メソッドのいずれかを使用します。

次の例では、変更ストリームは、 MongoClientが接続されている配置で観察されたすべての変更を出力します。

val observer = LatchedObserver()
client.watch().subscribe(observer)
observer.await() // Block waiting for the latch

集計ステージのリストをwatch()メソッドに渡して、 $changeStream演算子によって返されるデータを変更できます。

注意

すべての集計演算子がサポートされているわけではありません。 詳細については、サーバー マニュアルの「 Change Streams 」を参照してください。

次の例では、変更ストリームは、 insertupdatereplacedeleteの操作に対応するために観察されたすべての変更を出力します。

まず、パイプラインにはoperationTypeinsertupdatereplace 、またはdeleteのいずれかであるドキュメントをフィルタリングする$matchステージが含まれています。 次に、 fullDocumentFullDocument.UPDATE_LOOKUPに設定し、更新後のドキュメントが結果に含まれるようにします。

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
observer.await() // Block waiting for the latch

戻る

集計フレームワーク