监控数据变化
Overview
在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是MongoDB Server的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。
使用Scala驾驶员时,可以调用 watch()
方法返回 ChangeStreamObservable
的实例。然后,您可以订阅ChangeStreamObservable
实例以查看新的数据更改,例如更新、插入和删除。
样本数据
本指南中的示例使用restaurants
sample_restaurants
Atlas示例数据集的 数据库中的 集合。要从Scala应用程序访问权限此集合,请创建一个连接到AtlasMongoClient
集群的database
,然后为 和collection
变量分配以下值:
val database: MongoDatabase = client.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
提示
要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。
一些示例使用 LatchedObserver
类的实例来处理变更流事件。该类是一个自定义观察器,用于打印变更流事件并继续监控数据更改,直到变更流完成或生成错误。要使用 LatchedObserver
类,请将以下代码粘贴到应用程序文件中:
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() }
打开变更流
要打开变更流,请调用watch()
方法。 您调用watch()
方法的实例决定了变更流监控的事件范围。 您可以对以下类的实例调用watch()
方法:
MongoClient
:监控部署中所有数据库中所有集合的更改,不包括系统集合或admin
、local
和 数据库中的集合config
MongoDatabase
:监控一个数据库中所有集合的变更MongoCollection
:监控对一个集合的更改
以下示例调用 watch()
方法在 restaurants
集合上打开变更流。此代码会创建一个 LatchedObserver
实例来接收和输出发生的变更:
val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await()
要开始监视更改,运行前面的代码。然后,在另一个Shell中运行以下代码,更新name
字段值为 "Blarney Castle"
的文档:
val filter = equal("name", "Blarney Castle") val update = set("cuisine", "American") collection.updateOne(filter, update) .subscribe((res: UpdateResult) => println(res), (e: Throwable) => println(s"There was an error: $e"))
当您运行上述代码来更新集合时,变更流应用程序会在发生变更时打印变更。打印的变更事件类似于以下输出:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
修改变更流输出
要修改变更流输出,可以将管道阶段列表作为参数传递给 watch()
方法。您可以在列表中包含以下阶段:
$addFields
或$set
:向文档添加新字段$match
:筛选文档$project
:投影文档字段的子集$replaceWith
或$replaceRoot
:将输入文档替换为指定文档$redact
:限制文档内容$unset
:从文档中删除字段
Scala驾驶员提供了 Aggregates
类,其中包括用于构建前面的管道阶段的辅助方法。
以下示例创建了一个管道,该管道使用 Aggregates.filter()
方法构建$match
阶段。然后,代码将此管道传递给 watch()
方法,并指示 watch()
仅在发生更新操作时输出事件:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update")))) observer.await()
修改 watch() 行为
您可以通过链接 ChangeStreamObservable
类提供的方法来修改 watch()
方法的行为。下表描述了其中一些方法:
方法 | 说明 |
---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Attaches a comment to the operation. |
| Instructs the change stream to provide only changes that occurred at or after
the specified timestamp. |
| Sets the collation to use for the change stream cursor. |
有关watch()
选项的完整列表,请参阅API文档中的 ChangeStreamObservable。
包含前像和后像
重要
仅当您的部署使用MongoDB Server v6.0 或更高版本时,才能对集合启用前图像和后图像。
默认下,当您对集合执行操作时,相应的变更事件仅包括操作前后修改的字段及其值。
您可以指示watch()
方法返回文档的前像,即更改前文档的完整版本以及修改后的字段。要将前像包含在变更流事件中,请将fullDocumentBeforeChange()
方法链接到watch()
。将以下值之一传递给fullDocumentBeforeChange()
方法:
FullDocumentBeforeChange.WHEN_AVAILABLE
:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则此变更事件字段的值为null
。FullDocumentBeforeChange.REQUIRED
:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,服务器将引发错误。
您还可以指示watch()
方法返回文档的 后像 ,即除了修改后的字段之外的文档更改后的完整版本。要在变更流事件中包含后图像,请将fullDocument()
方法链接到watch()
。将以下值之一传递给fullDocument()
方法:
FullDocument.UPDATE_LOOKUP
:更改事件包括更改后某个时间点的整个已更改文档的副本。FullDocument.WHEN_AVAILABLE
:变更事件包括变更事件的已修改文档的后像。 如果后像不可用,则此变更事件字段的值为null
。FullDocument.REQUIRED
:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,服务器将引发错误。
以下示例对集合调用 watch()
方法,并通过链式调用 fullDocument()
方法包含更新文档的后像:
val observer = LatchedObserver() collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .subscribe(observer) observer.await()
当变更流应用程序在单独的Shell中运行时,使用前面的更新示例更新restaurants
集合中的文档会打印类似于以下输出的变更事件:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24", "coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard", "zipcode": "11697"}), (borough,BsonString{value='Queens'}), (cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}), (name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}), (blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription= UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
提示
要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。
更多信息
要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: