Docs 菜单
Docs 主页
/ / /
Kotlin 协程
/ / /

打开变更流

在此页面上

  • Overview
  • 打开变更流
  • 将聚合操作符应用于change stream
  • 分割大型change stream事件
  • 包含前像和后像

在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB v6.0 或更高版本时,您可以配置事件以包含更改之前和之后的文档数据。

通过以下部分了解如何打开和配置change stream:

  • 打开变更流

  • 将聚合操作符应用于change stream

  • 分割大型change stream事件

  • 包含前像和后像

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在实例上调用watch() MongoCollectionMongoDatabaseMongoClient方法。

重要

独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog服务器手册页面。

您对其调用watch()方法的对象决定了change stream侦听的事件范围。

如果您对MongoCollection调用watch() ,则change stream会监控一个collection。

如果您对MongoDatabase调用watch() ,则change stream将监控该数据库中的所有collection。

如果您对MongoClient调用watch() ,则change stream会监控已连接 MongoDB 部署中的所有变更。

以下代码示例展示了如何在集合中的数据发生更改时打开变更流并打印变更流事件:

// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

对collection执行插入操作时,应生成类似于以下文本的输出:

Received a change event: ChangeStreamDocument{
operationType='insert',
resumeToken={"_data": "825EC..."},
namespace=myDb.myChangeStreamCollection,
...
}

有关可运行的示例,请参阅监视变更用法示例页面。

要了解有关watch()方法的详情,请参阅以下 API 文档:

您可以将聚合管道作为参数传递给watch()方法,以指定change stream接收哪些事件。

要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。

以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:

val pipeline = listOf(
Aggregates.match(Filters.`in`("operationType",
listOf("insert", "update")))
)
// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

当变更流收到更新变更事件时,前面的代码示例将输出以下文本:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
...

连接到 MongoDB v7.0 或更高版本时,可以使用$changeStreamSplitLargeEvent聚合操作符将超过 16 MB 的事件文档分割成更小的片段。

仅当您预计change stream事件会超过文档大小限制时,才使用$changeStreamSplitLargeEvent操作符。例如,如果您的应用程序需要完整的文档前图像或帖子图像,则可以使用此功能。

$changeStreamSplitLargeEvent聚合阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。每个片段文档都包括一个splitEvent对象,其中包含以下字段:

字段
说明
fragment
片段的索引,起始于 1
of
组成分割事件的分片总数

以下示例打开一个change stream,其中包括一个管道,该管道具有用于分割大型事件的$changeStreamSplitLargeEvent聚合阶段:

val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

注意

聚合管道中只能有一个$changeStreamSplitLargeEvent阶段,并且它必须是管道中的最后一个阶段。

要了解有关$changeStreamSplitLargeEvent 聚合操作符的更多信息,请参阅 MongoDB Server手册中的 $changeStreamSplitLargeEvent(聚合) 。

您可以配置变更事件以包含或省略以下数据:

  • 前像是文档,如果存在,则表示操作前的文档版本

  • 后图像是一个文档,代表操作后的文档版本(如果存在的话)

要接收包含前像或后像的变更流事件,您必须连接到 MongoDB v6.0 或更高版本的部署,并设置以下内容:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署中启用这些功能,请参阅 MongoDB Server 的Change Streams with Document Pre-and Post-Images手册页面。

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。

  • 配置你的 change stream 以检索前映像和后映像中的一个或两个。

    提示

    要配置变更流以包含前映像,请参阅前映像配置示例。

    要配置变更流以包含后映像,请参阅后映像配置示例。

要使用驱动程序创建带有帖子和collection选项的实例,请指定ChangeStreamPreAndPostImagesOptions的实例并调用createCollection()方法,如以下示例所示:

val collectionOptions = CreateCollectionOptions()
collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true))
database.createCollection("myChangeStreamCollection", collectionOptions)

您可以通过从 MongoDB Shell 运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅collMod服务器手册文档。

警告

当您在collection上修改此选项时,如果配置为需要接收帖子映像或帖子映像,则应用程序中对该collection打开的任何change stream都可能会失败。

以下代码示例展示了如何配置change stream以包含前像并输出结果:

val job = launch {
val changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED选项。This configures the change stream to return pre-images for replace, update, and delete change events and for the server to raise an error if the pre-image is unavailable.

假设应用程序将collection中的文档的latestVersion字段的值从2.0.0更新为2.1.0 。前面的代码示例输出的相应变更事件应类似于以下文本:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...}
namespace=software.libraries,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}},
...

有关选项列表,请参阅 FullDocumentBeforeChange API 文档。

以下代码示例展示了如何配置change stream以包含帖子图像并输出结果:

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

前面的示例将change stream配置为使用FullDocument.UPDATE_LOOKUP选项。这会将change stream配置为返回原始文档和已更改文档之间的增量以及更改发生后某个时间点的文档副本。

假设某个应用程序将城市人口普查数据集合中文档的population字段从值800更新为950 。 前面的代码示例输出的相应变更事件应类似于以下文本:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
namespace=censusData.cities,
destinationNamespace=null,
fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}},
updatedFields={"population": 950}, ...
...

有关选项列表,请参阅 FullDocument API 文档。

后退

从流程访问数据