Docs 菜单
Docs 主页
/ / /
Scala
/

Change Streams

在此页面上

  • 先决条件
  • 连接到 MongoDB 部署
  • 观察集合中的所有更改
  • 监视数据库更改
  • 监视所有数据库的更改
  • 过滤内容

MongoDB Server 3.6版本引入了 $changeStream聚合管道操作符。

变更流提供了一种监视集合中文档变更的方法。 为了提高此新阶段的可用性, MongoCollection类型包含watch()方法。 ChangeStreamObservable实例会设置变更流,并在遇到可能可恢复的错误时自动尝试恢复。

您必须设置以下组件才能运行本指南中的代码示例:

  • 一个test.restaurants restaurants.json集合,其中填充了来自 文档资产Github 中的 文件的文档。

  • 以下 import 语句:

import java.util.concurrent.CountDownLatch
import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.changestream._

注意

本指南使用快速入门入门中所述的Observable隐式。

首先,连接到 MongoDB 部署,然后声明并定义MongoDatabaseMongoCollection实例。

以下代码连接到在端口27017上的localhost上运行的独立 MongoDB 部署。 然后,定义database变量以引用test数据库,并collection变量以引用restaurants集合:

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。

要创建变更流,请使用MongoCollection.watch()方法之一。

在以下示例中,变更流会打印出其观察到的所有更改:

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}
val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await() // Block waiting for the latch

应用程序可以打开单个变更流来监视数据库的所有非系统集合。 要创建此类变更流,请使用MongoDatabase.watch()方法之一。

在以下示例中,变更流会打印出它在给定数据库上观察到的所有变更:

val observer = LatchedObserver()
database.watch().subscribe(observer)
observer.await() // Block waiting for the latch

应用程序可以打开单个变更流来监视 MongoDB 部署中所有数据库的所有非系统集合。 要创建此类变更流,请使用MongoClient.watch()方法之一。

在以下示例中,变更流打印出它在MongoClient连接的部署中观察到的所有变更:

val observer = LatchedObserver()
client.watch().subscribe(observer)
observer.await() // Block waiting for the latch

您可以向watch()方法传递聚合阶段列表,以修改$changeStream操作符返回的数据。

注意

并非所有聚合操作符都受支持。 请参阅Change Streams MongoDB Server手册中的 以了解更多信息。

在以下示例中,变更流会打印出其观察到的与insertupdatereplacedelete操作相对应的所有更改。

首先,管道包括一个$match阶段,用于过滤operationTypeinsertupdatereplacedelete的文档。 然后,它将fullDocument设置为FullDocument.UPDATE_LOOKUP ,以便更新后的文档包含在结果中:

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
observer.await() // Block waiting for the latch

后退

聚合框架