监控数据变化
Overview
在本指南中,您可以学习;了解如何使用Kotlin Sync驾驶员监控变更流,从而查看数据库的实时更改。 变更流是MongoDB Server的一项功能,用于发布有关集合、数据库或部署的数据更改。 您的应用程序可以订阅变更流并使用事件来执行其他操作。
样本数据
本指南中的示例使用 Atlas示例数据集的sample_restaurants
数据库中的 restaurants
集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。
以下Kotlin数据类对此集合中的文档进行建模:
data class Restaurant( val name: String, val cuisine: String, )
打开变更流
要打开变更流,请调用watch()
方法。 您调用watch()
方法的实例决定了变更流侦听的事件范围。 您可以对以下类的实例调用watch()
方法:
MongoClient
:监控 MongoDB 部署中的所有更改MongoDatabase
:监控数据库中所有集合的变更MongoCollection
:监控集合中的更改
打开变更流示例
以下示例在restaurants
集合上打开变更流,并在发生变更时打印变更:
collection.watch().forEach { change -> println(change) }
要开始监视更改,请运行应用程序。 然后,在单独的应用程序或shell中对 restaurants
集合执行写入操作。 以下示例更新了name
值为"Blarney Castle"
的文档:
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle") val update = Updates.set(Restaurant::cuisine.name, "Irish") val result = collection.updateOne(filter, update)
更新集合时,变更流应用程序会在发生变更时打印变更。 打印的变更事件类似于以下内容:
{ "_id": { ... }, "operationType": "update", "clusterTime": { ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } ... }
修改变更流输出
您可以将pipeline
参数传递给watch()
方法,以修改变更流输出。 此参数允许您仅监视指定的更改事件。 将参数格式设置为对象列表,每个对象代表一个聚合阶段。
您可以在pipeline
参数中指定以下阶段:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
匹配特定事件 示例
以下示例使用包含$match
的pipeline
参数打开仅记录更新操作的变更流:
val pipeline = listOf( Aggregates.match(Filters.eq("operationType", "update")) ) collection.watch(pipeline).forEach { change -> println(change) }
要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。
修改 watch() 行为
您可以通过将方法链接到watch()
方法调用返回的ChangeStreamIterable
对象来修改watch()
。 如果不指定任何选项,驾驶员不会自定义操作。
下表描述了可用于自定义watch()
行为的方法:
方法 | 说明 |
---|---|
| Sets the number of documents to return per batch. |
| Specifies the kind of language collation to use when sorting
results. For more information, see Collation
in the MongoDB Server manual. |
| Specifies a comment to attach to the operation. |
| Sets the fullDocument value. To learn more, see the
Include Pre-Images and Post-Images section of this document. |
| Sets the fullDocumentBeforeChange value. To learn more, see the
Include Pre-Images and Post-Images section of this document. |
| Sets the maximum await execution time on the server for this operation, in
milliseconds. |
有关可用于配置watch()
方法的方法的完整列表,请参阅 ChangeStreamIterable API文档。
包含前像和后像
重要
仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。
默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请将fullDocumentBeforeChange()
或fullDocument()
方法链接到watch()
方法。
前像是文档在更改之前的完整版本。 要将前像包含在变更流事件中,请将以下选项之一传递给fullDocumentBeforeChange()
方法:
FullDocumentBeforeChange.WHEN_AVAILABLE
:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。FullDocumentBeforeChange.REQUIRED
:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。
后像是文档更改后的完整版本。 要将后图像包含在变更流事件中,请将以下选项之一传递给fullDocument()
方法:
FullDocument.UPDATE_LOOKUP
:更改事件包括更改后某个时间点的整个已更改文档的副本。FullDocument.WHEN_AVAILABLE
:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。FullDocument.REQUIRED
:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。
以下示例对集合调用watch()
方法,并通过指定fullDocument
参数将更新文档的后像包含在结果中:
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change -> println("Received a change: $change") }
在变更流应用程序运行的情况下,使用前面的更新示例更新restaurants
集合中的文档会打印类似于以下内容的变更事件:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{value=..., seconds=..., inc=...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。
更多信息
要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: