监控数据变化
Overview
在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是一项功能,允许您的应用程序订阅集合、数据库或部署上的数据更改。
样本数据
本指南中的示例使用Atlas示例数据集中的 sample_restaurants.restaurants
集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅快速入门。
本页的示例使用以下 Restaurant
、Address
和 GradeEntry
类作为模型:
public class Restaurant { public ObjectId Id { get; set; } public string Name { get; set; } [ ] public string RestaurantId { get; set; } public string Cuisine { get; set; } public Address Address { get; set; } public string Borough { get; set; } public List<GradeEntry> Grades { get; set; } }
public class Address { public string Building { get; set; } [ ] public double[] Coordinates { get; set; } public string Street { get; set; } [ ] public string ZipCode { get; set; } }
public class GradeEntry { public DateTime Date { get; set; } public string Grade { get; set; } public float? Score { get; set; } }
注意
restaurants
集合中的文档使用驼峰命名约定。本指南中的示例使用 ConventionPack
将集合中的字段反序列化为 Pascal 语句,然后映射到 Restaurant
类中的属性。
如需了解有关自定义序列化的更多信息,请参阅“自定义序列化”。
打开变更流
要打开变更流,请调用Watch()
或WatchAsync()
方法。 调用该方法的实例决定了变更流侦听的事件范围。 您可以对以下类调用Watch()
或WatchAsync()
方法:
MongoClient
:监控 MongoDB 部署中的所有更改Database
:监控数据库中所有集合的变更Collection
:监控集合中的更改
以下示例在restaurants
集合上打开变更流,并在发生变更时输出变更。 选择 Asynchronous或Synchronous标签页以查看相应的代码。
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change streams and print the changes as they're received using var cursor = await collection.WatchAsync(); await cursor.ForEachAsync(change => { Console.WriteLine("Received the following type of change: " + change.BackingDocument); });
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change stream and prints the changes as they're received using (var cursor = collection.Watch()) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following type of change: " + change.BackingDocument); } }
要开始监视更改,请运行应用程序。 然后,在单独的应用程序或shell中,修改 restaurants
集合。 更新"name"
值为"Blarney Castle"
的文档会产生以下变更流输出:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...), "wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [], "truncatedArrays" : [] } }
修改变更流输出
您可以将pipeline
参数传递给Watch()
和WatchAsync()
方法,以修改变更流输出。 此参数允许您仅监视指定的更改事件。 使用EmptyPipelineDefinition
类并附加相关聚合阶段方法来创建管道。
您可以在pipeline
参数中指定以下聚合阶段:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
要学习;了解如何使用PipelineDefinitionBuilder
类构建聚合管道,请参阅《Operations with Builders》(操作构建者指南)中的构建聚合管道。
以下示例使用pipeline
参数打开仅记录更新操作的变更流。 选择Asynchronous或Synchronous标签页以查看相应的代码。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { Console.WriteLine("Received the following change: " + change); }); }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received using (var cursor = _restaurantsCollection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following change: " + change); } }
要学习;了解有关修改变更流输出的更多信息,请参阅手册中的修改变更流输出部分。
修改 Watch()
行为
Watch()
和WatchAsync()
方法接受可选参数,这些参数表示可用于配置操作的选项。 如果不指定任何选项,驾驶员不会自定义操作。
下表描述了可以设立用于自定义Watch()
和WatchAsync()
行为的选项:
选项 | 说明 |
---|---|
FullDocument | Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
FullDocumentBeforeChange | Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
ResumeAfter | Directs Watch() or WatchAsync() to resume returning changes after the
operation specified in the resume token.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime . |
StartAfter | Directs Watch() or WatchAsync() to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime . |
StartAtOperationTime | Directs Watch() or WatchAsync() to return only events that occur after the
specified timestamp.StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter . |
MaxAwaitTime | Specifies the maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds. |
ShowExpandedEvents | Starting in v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To
include expanded events in a change stream, create the change stream
cursor and set this parameter to True . |
BatchSize | Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster. |
Collation | Specifies the collation to use for the change stream cursor. |
Comment | Attaches a comment to the operation. |
包含前像和后像
重要
仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。
默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请创建一个ChangeStreamOptions
对象并指定FullDocumentBeforeChange
或FullDocument
选项。 然后,将ChangeStreamOptions
对象传递给Watch()
或WatchAsync()
方法。
前像是文档在更改之前的完整版本。 要将前像包含在变更流事件,请将FullDocumentBeforeChange
选项设立为以下值之一:
ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable
:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。ChangeStreamFullDocumentBeforeChangeOption.Required
:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。
后像是文档更改后的完整版本。 要将后图像包含在变更流事件,请将FullDocument
选项设立为以下值之一:
ChangeStreamFullDocumentOption.UpdateLookup
:更改事件包括更改后某个时间点的整个已更改文档的副本。ChangeStreamFullDocumentOption.WhenAvailable
:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。ChangeStreamFullDocumentOption.Required
:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。
以下示例在集合上打开变更流,并通过指定FullDocument
选项包含更新文档的后映像。 选择Asynchronous或Synchronous标签页以查看相应的代码。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument()); });
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using (var cursor = _restaurantsCollection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine(change.FullDocument.ToBsonDocument()); } }
运行前面的代码示例并更新"name"
值为"Blarney Castle"
的文档会产生以下变更流输出:
{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356", "cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }
要学习;了解有关前图像和后图像的更多信息,请参阅手册中的具有文档前图像和后图像的Change Streams 。
更多信息
要学习;了解有关变更流的更多信息,请参阅手册中的Change Streams 。
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: