监控数据变化
Overview
在本指南中,您可以了解如何使用变更流来监控数据库的实时更改。 变更流是 MongoDB Server 的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。
使用C++驾驶员时,可以实例化 mongocxx::change_stream
来监控数据更改。
样本数据
本指南中的示例使用 Atlas示例数据集的sample_restaurants
数据库中的restaurants
集合。 要从C++应用程序访问权限此集合,请实例化一个连接到Atlas 集群的mongocxx::client
,并将以下值分配给db
和collection
变量:
auto db = client["sample_restaurants"]; auto collection = db["restaurants"];
要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。
打开变更流
要打开变更流,请调用watch()
方法。 您调用watch()
方法的实例决定了变更流侦听的事件范围。 您可以对以下类调用watch()
方法:
mongocxx::client
:监控MongoDB 部署中的所有更改mongocxx::database
:监控数据库中所有集合的变更mongocxx::collection
:监控集合中的更改
以下示例在restaurants
集合上打开变更流,并在发生变更时输出变更:
auto stream = collection.watch(); while (true) { for (const auto& event : stream) { std::cout << bsoncxx::to_json(event) << std::endl; } }
要开始监视更改,请运行前面的代码。 然后,在单独的应用程序或shell中,修改 restaurants
集合。 以下示例更新了name
字段值为Blarney Castle
的文档:
auto result = collection.update_one(make_document(kvp("name", "Blarney Castle")), make_document(kvp("$set", make_document(kvp("cuisine", "Irish")))));
更新集合时,变更流应用程序会在发生变更时打印变更。 打印的变更事件类似于以下输出:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... }, "wallTime" : { "$date" : ... }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
修改变更流输出
您可以将mongocxx::pipeline
实例作为参数传递给watch()
方法,以修改变更流输出。 以下列表包括一些mongocxx::pipeline
字段,您可以通过调用相应的 setter 方法来设立这些字段:
add_fields
:向文档添加新字段match
:筛选文档project
:投影文档字段的子集redact
:限制文档内容group
:按指定表达式对文档进行分组merge
:将结果输出到集合
提示
有关mongocxx::pipeline
字段的完整列表,请参阅 mongocxx:: 管道 API文档。
以下示例设置mongocxx::pipeline
实例的match
字段,然后将管道传递给watch()
方法。 这指示watch()
方法仅输出更新操作:
mongocxx::pipeline pipeline; pipeline.match(make_document(kvp("operationType", "update"))); auto stream = collection.watch(pipeline); while (true) { for (const auto& event : stream) { std::cout << bsoncxx::to_json(event) << std::endl; } }
修改watch()
行为
您可以通过将mongocxx::options::change_stream
类的实例作为参数传递来修改watch()
方法的行为。 下表描述了您可以在mongocxx::options::find
实例中设立的字段:
字段 | 说明 |
---|---|
full_document | Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
full_document_before_change | Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
resume_after | Instructs watch() to resume returning changes after the
operation specified in the resume token.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.resume_after is mutually exclusive with start_after and start_at_operation_time . |
start_after | Instructs watch() to start a new change stream after the
operation specified in the resume token. This field allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.start_after is mutually exclusive with resume_after and start_at_operation_time . |
start_at_operation_time | Instructs watch() to return only events that occur after the
specified timestamp.start_at_operation_time is mutually exclusive with resume_after and start_after . |
max_await_time_ms | Sets the maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds. |
batch_size | Sets the maximum number of change events to return in each batch of the
response from the MongoDB cluster. |
collation | Sets the collation to use for the change stream cursor. |
comment | Attaches a comment to the operation. |
包含前像和后像
重要
仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。
默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请指定mongocxx::options::change_stream
实例的full_document_before_change
或full_document
字段。
前像是文档在更改之前的完整版本。 要将前像包含在变更流事件,请将full_document_before_change
字段设立为以下字符串之一:
"whenAvailable"
:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。"required"
:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。
后像是文档更改后的完整版本。 要将后图像包含在变更流事件,请将full_document
字段设立为以下字符串之一:
"updateLookup"
:更改事件包括更改后某个时间点的整个已更改文档的副本。"whenAvailable"
:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。"required"
:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。
以下示例对集合调用watch()
方法,并通过设置mongocxx::options::change_stream
实例的full_document
字段来包含已更新文档的后像:
mongocxx::options::change_stream opts; opts.full_document("updateLookup"); auto stream = collection.watch(opts); while (true) { for (const auto& event : stream) { std::cout << bsoncxx::to_json(event) << std::endl; } }
在变更流应用程序运行的情况下,使用前面的更新示例更新restaurants
集合中的文档会打印类似于以下代码的变更事件:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : ... }, "fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : "202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "cuisine" : "Irish", "grades" : [ ... ], "name" : "Blarney Castle", "restaurant_id" : "40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
提示
要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。
更多信息
要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: