Docs 菜单
Docs 主页
/ / /
Rust 驱动程序
/ / /

打开变更流

在此页面上

  • Overview
  • 样本数据
  • 打开变更流
  • 例子
  • 将聚合操作符应用于change stream
  • 例子
  • 包含前像和后像
  • 创建启用前像和后像的集合
  • 前映像配置示例
  • 图像后配置示例
  • 更多信息
  • API 文档

在本指南中,您可以了解如何使用change stream来监控数据的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。

您可以打开change stream来帮助执行以下操作:

  • 使设备能够追踪事件并做出React,例如运动检测摄像头

  • 创建监控股票价格变化的应用程序

  • 生成特定作业职位的员工活动日志

您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 当连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。

要了解如何打开和配置change stream,请导航至以下部分:

  • 样本数据

  • 打开变更流

  • 将聚合操作符应用于change stream

  • 包含前像和后像

  • 更多信息

本指南中的示例监控对 directorscollection的更改。假设此collection包含以下文档,这些文档被建模为结构体:

let docs = vec! [
Director {
name: "Todd Haynes".to_string(),
movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()],
oscar_noms: 1,
},
Director {
name: "Jane Campion".to_string(),
movies: vec! ["The Piano".to_string(), "Bright Star".to_string()],
oscar_noms: 5,
}
];

提示

要学习;了解如何将文档插入集合,请参阅插入文档指南。

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在CollectionDatabaseClient实例上调用watch()方法。

重要

独立运行的MongoDB部署不支持变更流,因为该功能需要副本集oplog。 要学习;了解有关oplog的更多信息,请参阅服务器手册中的副本集oplog页面。

您对其调用watch()方法的结构体类型决定了change stream侦听的事件范围。下表根据调用对象描述了watch()方法的行为:

结构类型
的行为 watch()

Collection

Monitors changes to the individual collection

Database

Monitors changes to all collections in the database

Client

Monitors all changes in the connected MongoDB deployment

以下示例在directorscollection上打开一个change stream。该代码通过访问每个ChangeStreamEvent实例的operation_typefull_document字段来打印每个更改事件的操作类型和修改后的文档:

let mut change_stream = my_coll.watch().await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Document: {:?}", event.full_document);
}

如果将文档插入到directors name值为"Wes Anderson" 的collection中,则前面的代码会产生以下输出:

Operation performed: Insert
Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })

您可以将pipeline()方法链接到watch()方法,以指定变更流接收哪些变更事件。 将聚合管道作为参数传递给pipeline()

要学习;了解您的MongoDB Server版本支持哪些聚合操作符,请参阅服务器手册中的修改变更流输出

以下示例创建了一个聚合管道来筛选更新操作。 然后,代码将管道传递给pipeline()方法,将change stream配置为仅接收和打印更新操作的事件:

let mut update_change_stream = my_coll.watch()
.pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }])
.await?;
while let Some(event) = update_change_stream.next().await.transpose()? {
println!("Update performed: {:?}", event.update_description);
}

如果通过增加oscar_noms字段的值来更新name值为"Todd Haynes"的文档,则前面的代码将产生以下输出:

Update performed: Some(UpdateDescription { updated_fields: Document({
"oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })

提示

要了解如何执行更新操作,请参阅修改文档指南。

您可以配置变更事件以包含或省略以下数据:

  • 前图像:表示操作之前文档版本的文档(如果存在)

  • 帖子-image :表示操作后文档版本的文档(如果存在)

重要

仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。

要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅本页的创建启用了前图像和后图像的集合部分。

  • 配置变更流以检索前映像和后映像中的一个或两个。 在此配置期间,您可以指示驱动程序要求使用前像和帖子,或仅在可用时包含前像和帖子。

    提示

    要配置变更流以记录变更事件中的前映像,请参阅本页上的前映像配置示例

    要配置变更流以记录变更事件中的后映像,请参阅本页上的后映像配置示例

要为集合启用前像和后像文档,请使用change_stream_pre_and_post_images()选项构建器方法。 以下示例使用此构建器方法指定集合选项,并创建前像和后像可用的集合:

let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build();
let result = my_db.create_collection("directors")
.change_stream_pre_and_post_images(enable)
.await?;

您可以通过从 MongoDB Shell 或应用程序运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 运行命令 指南和 MongoDB Server手册中有关 collMod 的条目。

警告

如果在collection上启用了前映像或帖子,则使用collMod修改这些设置可能会导致该collection上的现有change stream终止。

要配置返回包含前像的变更事件的变更流,请使用full_document_before_change()选项构建器方法。 以下示例指定变更流选项并创建返回前映像文档的变更流:

let pre_image = FullDocumentBeforeChangeType::Required;
let mut change_stream = my_coll.watch()
.full_document_before_change(pre_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Pre-image: {:?}", event.full_document_before_change);
}

前面的示例将值FullDocumentBeforeChangeType::Required传递给full_document_before_change()选项构建器方法。 此方法将变更流配置为需要预映像来替换、更新和删除变更事件。 如果前像不可用,则驾驶员会引发错误。

如果您更新name值为"Jane Campion"的文档,则变更事件会产生以下输出:

Operation performed: Update
Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano",
"Bright Star"], oscar_noms: 5 })

要配置变更流以返回包含后图像的变更事件,请使用full_document()选项构建器方法。 以下示例指定变更流选项并创建返回图像后文档的变更流:

let post_image = FullDocumentType::WhenAvailable;
let mut change_stream = my_coll.watch()
.full_document(post_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Post-image: {:?}", event.full_document);
}

前面的示例将值FullDocument::WhenAvailable传递给full_document()选项构建器方法。 此方法将变更流配置为返回替换、更新和删除变更事件的后映像(如果后映像可用)。

如果删除name值为"Todd Haynes"的文档,则变更事件会产生以下输出:

Operation performed: Delete
Post-image: None

要进一步了解本指南所提及的任何方法或类型,请参阅以下 API 文档:

后退

数据游标