Monitor Data Changes
On this page
Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
Sample Data
The examples in this guide use the sample_restaurants.restaurants
collection
from the Atlas sample datasets. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the Quick Start.
The examples on this page use the following Restaurant
, Address
, and GradeEntry
classes as models:
public class Restaurant { public ObjectId Id { get; set; } public string Name { get; set; } [ ] public string RestaurantId { get; set; } public string Cuisine { get; set; } public Address Address { get; set; } public string Borough { get; set; } public List<GradeEntry> Grades { get; set; } }
public class Address { public string Building { get; set; } [ ] public double[] Coordinates { get; set; } public string Street { get; set; } [ ] public string ZipCode { get; set; } }
public class GradeEntry { public DateTime Date { get; set; } public string Grade { get; set; } public float? Score { get; set; } }
Note
The documents in the restaurants
collection use the snake-case naming
convention. The examples in this guide use a ConventionPack
to deserialize the fields in the collection into Pascal case and map them to
the properties in the Restaurant
class.
To learn more about custom serialization, see Custom Serialization.
Open a Change Stream
To open a change stream, call the Watch()
or WatchAsync()
method. The instance on which you
call the method determines the scope of events that the change
stream listens for. You can call the Watch()
or WatchAsync()
method on the following
classes:
MongoClient
: To monitor all changes in the MongoDB deploymentDatabase
: To monitor changes in all collections in the databaseCollection
: To monitor changes in the collection
The following example opens a change stream on the restaurants
collection
and outputs the changes as they occur. Select the
Asynchronous or Synchronous tab to see the corresponding
code.
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change streams and print the changes as they're received using var cursor = await collection.WatchAsync(); await cursor.ForEachAsync(change => { Console.WriteLine("Received the following type of change: " + change.BackingDocument); });
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change stream and prints the changes as they're received using (var cursor = collection.Watch()) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following type of change: " + change.BackingDocument); } }
To begin watching for changes, run the application. Then, in a separate
application or shell, modify the restaurants
collection. Updating a document
that has a "name"
value of "Blarney Castle"
results in the following
change stream output:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...), "wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [], "truncatedArrays" : [] } }
Modify the Change Stream Output
You can pass the pipeline
parameter to the Watch()
and WatchAsync()
methods to modify the change stream output. This parameter allows you to watch
for only specified change events. Create the pipeline by using the
EmptyPipelineDefinition
class and appending the relevant aggregation stage methods.
You can specify the following aggregation stages in the pipeline
parameter:
$addFields
$changeStreamSplitLargeEvent
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
Tip
To learn how to build an aggregation pipeline by using the
PipelineDefinitionBuilder
class, see Build an Aggregation Pipeline in
the Operations with Builders guide.
To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.
Monitor Update Events Example
The following example uses the pipeline
parameter to open a change stream
that records only update operations. Select the Asynchronous or Synchronous tab to see the
corresponding code.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received using (var cursor = await collection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { Console.WriteLine("Received the following change: " + change); }); }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received using (var cursor = collection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following change: " + change); } }
Split Large Change Events Example
If your application generates change events that exceed 16 MB in size, the
server returns a BSONObjectTooLarge
error. To avoid this error, you can use
the $changeStreamSplitLargeEvent
pipeline stage to split the events
into smaller fragments. The .NET/C# Driver aggregation API includes the
ChangeStreamSplitLargeEvent()
method, which you can use to add the
$changeStreamSplitLargeEvent
stage to the change stream pipeline.
This example instructs the driver to watch for changes and split change events that exceed the 16 MB limit. The code prints the change document for each event and calls helper methods to reassemble any event fragments:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .ChangeStreamSplitLargeEvent(); using var cursor = await collection.WatchAsync(pipeline); await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) { Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .ChangeStreamSplitLargeEvent(); using var cursor = collection.Watch(pipeline); foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) { Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); }
Note
We recommend reassembling change event fragments, as shown in the preceding example, but this step is optional. You can use the same logic to watch both split and complete change events.
The preceding example uses the GetNextChangeStreamEvent()
,
GetNextChangeStreamEventAsync()
, and MergeFragment()
methods to reassemble change event fragments into a single change stream document.
The following code defines these methods:
// Fetches the next complete change stream event private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>( IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) { var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator(); while (await changeStreamEnumerator.MoveNextAsync()) { var changeStreamEvent = changeStreamEnumerator.Current; if (changeStreamEvent.SplitEvent != null) { var fragment = changeStreamEvent; while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { await changeStreamEnumerator.MoveNextAsync(); fragment = changeStreamEnumerator.Current; MergeFragment(changeStreamEvent, fragment); } } yield return changeStreamEvent; } } private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>( IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) { while (await changeStreamCursor.MoveNextAsync()) { foreach (var changeStreamEvent in changeStreamCursor.Current) { yield return changeStreamEvent; } } } // Merges a fragment into the base event private static void MergeFragment<TDocument>( ChangeStreamDocument<TDocument> changeStreamEvent, ChangeStreamDocument<TDocument> fragment) { foreach (var element in fragment.BackingDocument) { if (element.Name != "_id" && element.Name != "splitEvent") { changeStreamEvent.BackingDocument[element.Name] = element.Value; } } }
// Fetches the next complete change stream event private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>( IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator) { while (changeStreamEnumerator.MoveNext()) { var changeStreamEvent = changeStreamEnumerator.Current; if (changeStreamEvent.SplitEvent != null) { var fragment = changeStreamEvent; while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { changeStreamEnumerator.MoveNext(); fragment = changeStreamEnumerator.Current; MergeFragment(changeStreamEvent, fragment); } } yield return changeStreamEvent; } } // Merges a fragment into the base event private static void MergeFragment<TDocument>( ChangeStreamDocument<TDocument> changeStreamEvent, ChangeStreamDocument<TDocument> fragment) { foreach (var element in fragment.BackingDocument) { if (element.Name != "_id" && element.Name != "splitEvent") { changeStreamEvent.BackingDocument[element.Name] = element.Value; } } }
Tip
To learn more about splitting large change events, see $changeStreamSplitLargeEvent in the MongoDB Server manual.
Modify Watch()
Behavior
The Watch()
and WatchAsync()
methods accept optional parameters, which represent
options you can use to configure the operation. If you don't specify any
options, the driver does not customize the operation.
The following table describes the options you can set to customize the behavior
of Watch()
and WatchAsync()
:
Option | Description |
---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Directs Watch() or WatchAsync() to resume returning changes after the
operation specified in the resume token.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime . |
| Directs Watch() or WatchAsync() to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime . |
| Directs Watch() or WatchAsync() to return only events that occur after the
specified timestamp.StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter . |
| Specifies the maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds. |
| Starting in MongoDB Server v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To
include expanded events in a change stream, create the change stream
cursor and set this parameter to True . |
| Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster. |
| Specifies the collation to use for the change stream cursor. |
| Attaches a comment to the operation. |
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, create a ChangeStreamOptions
object and specify the
FullDocumentBeforeChange
or the FullDocument
options. Then, pass the
ChangeStreamOptions
object to the Watch()
or WatchAsync()
method.
The pre-image is the full version of a document before a change. To include the
pre-image in the change stream event, set the FullDocumentBeforeChange
option to one of the following values:
ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable
: The change event includes a pre-image of the modified document for change events only if the pre-image is available.ChangeStreamFullDocumentBeforeChangeOption.Required
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.
The post-image is the full version of a document after a change. To include the
post-image in the change stream event, set the FullDocument
option to
one of the following values:
ChangeStreamFullDocumentOption.UpdateLookup
: The change event includes a copy of the entire changed document from some time after the change.ChangeStreamFullDocumentOption.WhenAvailable
: The change event includes a post-image of the modified document for change events only if the post-image is available.ChangeStreamFullDocumentOption.Required
: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.
The following example opens a change stream on a collection and includes the post-image
of updated documents by specifying the FullDocument
option. Select the
Asynchronous or Synchronous tab to see the corresponding
code.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using var cursor = await collection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument()); });
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using (var cursor = collection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine(change.FullDocument.ToBsonDocument()); } }
Running the preceding code example and updating a document that has a "name"
value of "Blarney Castle"
results in the following change stream output:
{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356", "cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: