打开变更流
Overview
在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。
您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。
通过以下部分了解如何打开和配置change stream:
打开变更流
您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。
要打开change stream,请在实例上调用watch()
MongoCollection
MongoDatabase
MongoClient
方法。
重要
独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。
您对其调用watch()
方法的对象决定了change stream侦听的事件范围。
如果您对MongoCollection
调用watch()
,则change stream会监控一个collection。
如果您对MongoDatabase
调用watch()
,则change stream将监控该数据库中的所有collection。
如果您对MongoClient
调用watch()
,则change stream会监控已连接 MongoDB 部署中的所有变更。
例子
myColl
此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。
驱动程序将 change stream 事件存储在类型为ChangeStreamIterable
的变量中。在以下示例中,我们指定驱动程序应使用Document
类型填充ChangeStreamIterable
对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument
对象。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
对collection执行插入操作会产生以下输出:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
有关可运行的示例,请参阅监视变更用法示例页面。
要了解有关watch()
方法的详情,请参阅以下 API 文档:
将聚合操作符应用于change stream
您可以将聚合管道作为参数传递给watch()
方法,以指定change stream接收哪些事件。
要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。
例子
以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
对集合执行更新操作会产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
分割大型change stream事件
从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent
聚合阶段将超过 16 MB 的事件分割成更小的片段。
仅在绝对必要时使用$changeStreamSplitLargeEvent
。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent
。
$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent
每个片段都包括一个包含以下字段的对象:
字段 | 说明 |
---|---|
| 片段的索引,起始于 |
| 组成分割事件的分片总数 |
以下示例通过使用$changeStreamSplitLargeEvent
聚合阶段分割大型事件来修改change stream:
ChangeStreamIterable<Document> changeStream = collection.watch( Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
注意
聚合管道中只能有一个$changeStreamSplitLargeEvent
阶段,并且它必须是管道中的最后一个阶段。
您可以在change stream游标上调用getSplitEvent()
方法来访问SplitEvent
,如以下示例所示:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
有关$changeStreamSplitLargeEvent
聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。
包含前像和后像
您可以配置变更事件以包含或省略以下数据:
前像,表示操作前文档版本的文档(如果存在)
帖子-图像,表示操作后文档版本的文档(如果存在)
重要
仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。
要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:
为 MongoDB 部署中的collection启用前像和帖子。
提示
要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流。
要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。
配置你的 change stream 以检索前映像和后映像中的一个或两个。
创建启用前像和后像的集合
要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions
createCollection()
方法,如以下示例所示:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
您可以通过从 MongoDB Shell 运行collMod
命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。
警告
如果在collection上启用了前映像或帖子,则使用collMod
修改这些设置可能会导致该collection上的现有change stream失败。
前像配置示例
以下代码示例展示如何在myColl
collection 上配置 change stream 以包含前映像并输出任何事件:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED
选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。
假设您将文档中amount
字段的值从150
更新为2000
。 此变更事件产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
有关选项列表,请参阅 FullDocumentBeforeChange API 文档。
帖子图像配置示例
以下代码示例展示如何在myColl
collection 上配置 change stream 以包含前映像并输出任何事件:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE
选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。
假设您将文档中color
字段的值从"purple"
更新为"pink"
。 变更事件产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
有关选项列表,请参阅 FullDocument API 文档。