打开变更流
Overview
在本指南中,您可以了解如何使用change stream来监控数据的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。
您可以打开change stream来帮助执行以下操作:
使设备能够追踪事件并做出React,例如运动检测摄像头
创建监控股票价格变化的应用程序
生成特定作业职位的员工活动日志
您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 当连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。
要了解如何打开和配置change stream,请导航至以下部分:
样本数据
本指南中的示例监控对 directors
collection的更改。假设此collection包含以下文档,这些文档被建模为结构体:
let docs = vec! [ Director { name: "Todd Haynes".to_string(), movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()], oscar_noms: 1, }, Director { name: "Jane Campion".to_string(), movies: vec! ["The Piano".to_string(), "Bright Star".to_string()], oscar_noms: 5, } ];
提示
要学习;了解如何将文档插入集合,请参阅插入文档指南。
打开变更流
您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。
要打开change stream,请在Collection
、 Database
或Client
实例上调用watch()
方法。
重要
独立运行的MongoDB部署不支持变更流,因为该功能需要副本集oplog。 要学习;了解有关oplog的更多信息,请参阅服务器手册中的副本集oplog页面。
您对其调用watch()
方法的结构体类型决定了change stream侦听的事件范围。下表根据调用对象描述了watch()
方法的行为:
结构类型 | 的行为 watch() |
---|---|
| Monitors changes to the individual collection |
| Monitors changes to all collections in the database |
| Monitors all changes in the connected MongoDB deployment |
例子
以下示例在directors
collection上打开一个change stream。该代码通过访问每个ChangeStreamEvent
实例的operation_type
和full_document
字段来打印每个更改事件的操作类型和修改后的文档:
let mut change_stream = my_coll.watch().await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Document: {:?}", event.full_document); }
提示
有关ChangeStreamEvent
结构体字段的完整列表,请参阅API文档中的 ChangeStreamEvent 。
如果将文档插入到directors
name
值为"Wes Anderson"
的collection中,则前面的代码会产生以下输出:
Operation performed: Insert Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })
将聚合操作符应用于change stream
您可以将pipeline()
方法链接到watch()
方法,以指定变更流接收哪些变更事件。 将聚合管道作为参数传递给pipeline()
。
要学习;了解您的MongoDB Server版本支持哪些聚合操作符,请参阅服务器手册中的修改变更流输出。
例子
以下示例创建了一个聚合管道来筛选更新操作。 然后,代码将管道传递给pipeline()
方法,将change stream配置为仅接收和打印更新操作的事件:
let mut update_change_stream = my_coll.watch() .pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }]) .await?; while let Some(event) = update_change_stream.next().await.transpose()? { println!("Update performed: {:?}", event.update_description); }
如果通过增加oscar_noms
字段的值来更新name
值为"Todd Haynes"
的文档,则前面的代码将产生以下输出:
Update performed: Some(UpdateDescription { updated_fields: Document({ "oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })
提示
要了解如何执行更新操作,请参阅修改文档指南。
包含前像和后像
您可以配置变更事件以包含或省略以下数据:
前图像:表示操作之前文档版本的文档(如果存在)
帖子-image :表示操作后文档版本的文档(如果存在)
重要
仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。
要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:
为 MongoDB 部署中的collection启用前像和帖子。
提示
要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流。
要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅本页的创建启用了前图像和后图像的集合部分。
配置变更流以检索前映像和后映像中的一个或两个。 在此配置期间,您可以指示驱动程序要求使用前像和帖子,或仅在可用时包含前像和帖子。
创建启用前像和后像的集合
要为集合启用前像和后像文档,请使用change_stream_pre_and_post_images()
选项构建器方法。 以下示例使用此构建器方法指定集合选项,并创建前像和后像可用的集合:
let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build(); let result = my_db.create_collection("directors") .change_stream_pre_and_post_images(enable) .await?;
您可以通过从 MongoDB Shell 或应用程序运行collMod
命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 运行命令 指南和 MongoDB Server手册中有关 collMod 的条目。
警告
如果在collection上启用了前映像或帖子,则使用collMod
修改这些设置可能会导致该collection上的现有change stream终止。
前映像配置示例
要配置返回包含前像的变更事件的变更流,请使用full_document_before_change()
选项构建器方法。 以下示例指定变更流选项并创建返回前映像文档的变更流:
let pre_image = FullDocumentBeforeChangeType::Required; let mut change_stream = my_coll.watch() .full_document_before_change(pre_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Pre-image: {:?}", event.full_document_before_change); }
前面的示例将值FullDocumentBeforeChangeType::Required
传递给full_document_before_change()
选项构建器方法。 此方法将变更流配置为需要预映像来替换、更新和删除变更事件。 如果前像不可用,则驾驶员会引发错误。
如果您更新name
值为"Jane Campion"
的文档,则变更事件会产生以下输出:
Operation performed: Update Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano", "Bright Star"], oscar_noms: 5 })
图像后配置示例
要配置变更流以返回包含后图像的变更事件,请使用full_document()
选项构建器方法。 以下示例指定变更流选项并创建返回图像后文档的变更流:
let post_image = FullDocumentType::WhenAvailable; let mut change_stream = my_coll.watch() .full_document(post_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Post-image: {:?}", event.full_document); }
前面的示例将值FullDocument::WhenAvailable
传递给full_document()
选项构建器方法。 此方法将变更流配置为返回替换、更新和删除变更事件的后映像(如果后映像可用)。
如果删除name
值为"Todd Haynes"
的文档,则变更事件会产生以下输出:
Operation performed: Delete Post-image: None
更多信息
API 文档
要进一步了解本指南所提及的任何方法或类型,请参阅以下 API 文档: