Change Streams
MongoDB Server 3.6版本引入了 $changeStream
聚合管道操作符。
变更流提供了一种监视集合中文档变更的方法。 为了提高此新阶段的可用性, MongoCollection
类型包含watch()
方法。 ChangeStreamObservable
实例会设置变更流,并在遇到可能可恢复的错误时自动尝试恢复。
先决条件
您必须设置以下组件才能运行本指南中的代码示例:
A
test.restaurants
集合 populated with documents from therestaurants.json
文件 in the documentation assets Github.以下 import 语句:
import java.util.concurrent.CountDownLatch import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.changestream._
注意
本指南使用快速入门入门中所述的Observable
隐式。
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义MongoDatabase
和MongoCollection
实例。
以下代码连接到在端口27017
上的localhost
上运行的独立 MongoDB 部署。 然后,定义database
变量以引用test
数据库,并collection
变量以引用restaurants
集合:
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。
观察集合中的所有更改
要创建变更流,请使用MongoCollection.watch()
方法之一。
在以下示例中,变更流会打印出其观察到的所有更改:
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() } val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await() // Block waiting for the latch
监视数据库更改
应用程序可以打开单个变更流来监视数据库的所有非系统集合。 要创建此类变更流,请使用MongoDatabase.watch()
方法之一。
在以下示例中,变更流会打印出它在给定数据库上观察到的所有变更:
val observer = LatchedObserver() database.watch().subscribe(observer) observer.await() // Block waiting for the latch
监视所有数据库的更改
应用程序可以打开单个变更流来监视 MongoDB 部署中所有数据库的所有非系统集合。 要创建此类变更流,请使用MongoClient.watch()
方法之一。
在以下示例中,变更流打印出它在MongoClient
连接的部署中观察到的所有变更:
val observer = LatchedObserver() client.watch().subscribe(observer) observer.await() // Block waiting for the latch
过滤内容
您可以向watch()
方法传递聚合阶段列表,以修改$changeStream
操作符返回的数据。
注意
并非所有聚合操作符都受支持。 请参阅Change Streams MongoDB Server手册中的 以了解更多信息。
在以下示例中,变更流会打印出其观察到的与insert
、 update
、 replace
和delete
操作相对应的所有更改。
首先,管道包括一个$match
阶段,用于过滤operationType
为insert
、 update
、 replace
或delete
的文档。 然后,它将fullDocument
设置为FullDocument.UPDATE_LOOKUP
,以便更新后的文档包含在结果中:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete"))))) .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer) observer.await() // Block waiting for the latch