Docs 菜单
Docs 主页
/ / /
C++ 驱动程序
/

监控数据变化

在此页面上

  • Overview
  • 样本数据
  • 打开变更流
  • 修改变更流输出
  • 修改 watch()行为
  • 包含前像和后像
  • 更多信息
  • API 文档

在本指南中,您可以了解如何使用变更流来监控数据库的实时更改。 变更流是 MongoDB Server 的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。

使用C++驾驶员时,可以实例化 mongocxx::change_stream来监控数据更改。

本指南中的示例使用 Atlas示例数据集sample_restaurants数据库中的restaurants集合。 要从C++应用程序访问权限此集合,请实例化一个连接到Atlas 集群的mongocxx::client ,并将以下值分配给dbcollection变量:

auto db = client["sample_restaurants"];
auto collection = db["restaurants"];

要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。

要打开变更流,请调用watch()方法。 您调用watch()方法的实例决定了变更流侦听的事件范围。 您可以对以下类调用watch()方法:

  • mongocxx::client:监控MongoDB 部署中的所有更改

  • mongocxx::database:监控数据库中所有集合的变更

  • mongocxx::collection:监控集合中的更改

以下示例在restaurants集合上打开变更流,并在发生变更时输出变更:

auto stream = collection.watch();
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
}

要开始监视更改,请运行前面的代码。 然后,在单独的应用程序或shell中,修改 restaurants集合。 以下示例更新了name字段值为Blarney Castle的文档:

auto result = collection.update_one(make_document(kvp("name", "Blarney Castle")),
make_document(kvp("$set",
make_document(kvp("cuisine", "Irish")))));

更新集合时,变更流应用程序会在发生变更时打印变更。 打印的变更事件类似于以下输出:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... }, "wallTime" : { "$date" : ... }, "ns" :
{ "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" :
{ "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" :
{ "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }

您可以将mongocxx::pipeline实例作为参数传递给watch()方法,以修改变更流输出。 以下列表包括一些mongocxx::pipeline字段,您可以通过调用相应的 setter 方法来设立这些字段:

  • add_fields:向文档添加新字段

  • match:筛选文档

  • project:投影文档字段的子集

  • redact:限制文档内容

  • group:按指定表达式对文档进行分组

  • merge:将结果输出到集合

提示

有关mongocxx::pipeline 字段的完整列表,请参阅 mongocxx::管道 API文档。

以下示例设置mongocxx::pipeline实例的match字段,然后将管道传递给watch()方法。 这指示watch()方法仅输出更新操作:

mongocxx::pipeline pipeline;
pipeline.match(make_document(kvp("operationType", "update")));
auto stream = collection.watch(pipeline);
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
}

您可以通过将mongocxx::options::change_stream类的实例作为参数传递来修改watch()方法的行为。 下表描述了您可以在mongocxx::options::find实例中设立的字段:

字段
说明
full_document
Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.
full_document_before_change
Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.
resume_after
Instructs watch() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
resume_after is mutually exclusive with start_after and start_at_operation_time.
start_after
Instructs watch() to start a new change stream after the operation specified in the resume token. This field allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
start_after is mutually exclusive with resume_after and start_at_operation_time.
start_at_operation_time
Instructs watch() to return only events that occur after the specified timestamp.
start_at_operation_time is mutually exclusive with resume_after and start_after.
max_await_time_ms
Sets the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.
batch_size
Sets the maximum number of change events to return in each batch of the response from the MongoDB cluster.
collation
Sets the collation to use for the change stream cursor.
comment
Attaches a comment to the operation.

重要

仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。

默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请指定mongocxx::options::change_stream实例的full_document_before_changefull_document字段。

前像是文档在更改之前的完整版本。 要将前像包含在变更流事件,请将full_document_before_change字段设立为以下字符串之一:

  • "whenAvailable":仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。

  • "required":变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。

后像是文档更改的完整版本。 要将后图像包含在变更流事件,请将full_document字段设立为以下字符串之一:

  • "updateLookup":更改事件包括更改后某个时间点的整个已更改文档的副本。

  • "whenAvailable":仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。

  • "required":变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。

以下示例对集合调用watch()方法,并通过设置mongocxx::options::change_stream实例的full_document字段来包含已更新文档的后像:

mongocxx::options::change_stream opts;
opts.full_document("updateLookup");
auto stream = collection.watch(opts);
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
}

在变更流应用程序运行的情况下,使用前面的更新示例更新restaurants集合中的文档会打印类似于以下代码的变更事件:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : ... },
"fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : "202-24",
"coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" :
"Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "cuisine" :
"Irish", "grades" : [ ... ], "name" : "Blarney Castle", "restaurant_id" : "40366356" },
"ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" :
{ "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" :
{ "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }

提示

要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。

要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档:

后退

从游标访问数据