Docs Menu
Docs Home
/ / /
C#/.NET
/ / /

Monitor Data Changes

On this page

  • Overview
  • Sample Data
  • Open a Change Stream
  • Modify the Change Stream Output
  • Monitor Update Events Example
  • Split Large Change Events Example
  • Modify Watch() Behavior
  • Include Pre-Images and Post-Images
  • Additional Information
  • API Documentation

In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.

The examples in this guide use the sample_restaurants.restaurants collection from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Quick Start.

The examples on this page use the following Restaurant, Address, and GradeEntry classes as models:

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

Note

The documents in the restaurants collection use the snake-case naming convention. The examples in this guide use a ConventionPack to deserialize the fields in the collection into Pascal case and map them to the properties in the Restaurant class.

To learn more about custom serialization, see Custom Serialization.

To open a change stream, call the Watch() or WatchAsync() method. The instance on which you call the method determines the scope of events that the change stream listens for. You can call the Watch() or WatchAsync() method on the following classes:

  • MongoClient: To monitor all changes in the MongoDB deployment

  • Database: To monitor changes in all collections in the database

  • Collection: To monitor changes in the collection

The following example opens a change stream on the restaurants collection and outputs the changes as they occur. Select the Asynchronous or Synchronous tab to see the corresponding code.

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}

To begin watching for changes, run the application. Then, in a separate application or shell, modify the restaurants collection. Updating a document that has a "name" value of "Blarney Castle" results in the following change stream output:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

You can pass the pipeline parameter to the Watch() and WatchAsync() methods to modify the change stream output. This parameter allows you to watch for only specified change events. Create the pipeline by using the EmptyPipelineDefinition class and appending the relevant aggregation stage methods.

You can specify the following aggregation stages in the pipeline parameter:

  • $addFields

  • $changeStreamSplitLargeEvent

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

Tip

To learn how to build an aggregation pipeline by using the PipelineDefinitionBuilder class, see Build an Aggregation Pipeline in the Operations with Builders guide.

To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.

The following example uses the pipeline parameter to open a change stream that records only update operations. Select the Asynchronous or Synchronous tab to see the corresponding code.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await collection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = collection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}

If your application generates change events that exceed 16 MB in size, the server returns a BSONObjectTooLarge error. To avoid this error, you can use the $changeStreamSplitLargeEvent pipeline stage to split the events into smaller fragments. The .NET/C# Driver aggregation API includes the ChangeStreamSplitLargeEvent() method, which you can use to add the $changeStreamSplitLargeEvent stage to the change stream pipeline.

This example instructs the driver to watch for changes and split change events that exceed the 16 MB limit. The code prints the change document for each event and calls helper methods to reassemble any event fragments:

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}

Note

We recommend reassembling change event fragments, as shown in the preceding example, but this step is optional. You can use the same logic to watch both split and complete change events.

The preceding example uses the GetNextChangeStreamEvent(), GetNextChangeStreamEventAsync(), and MergeFragment() methods to reassemble change event fragments into a single change stream document. The following code defines these methods:

// Fetches the next complete change stream event
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// Fetches the next complete change stream event
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}

Tip

To learn more about splitting large change events, see $changeStreamSplitLargeEvent in the MongoDB Server manual.

The Watch() and WatchAsync() methods accept optional parameters, which represent options you can use to configure the operation. If you don't specify any options, the driver does not customize the operation.

The following table describes the options you can set to customize the behavior of Watch() and WatchAsync():

Option
Description

FullDocument

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

FullDocumentBeforeChange

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

ResumeAfter

Directs Watch() or WatchAsync() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime.

StartAfter

Directs Watch() or WatchAsync() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime.

StartAtOperationTime

Directs Watch() or WatchAsync() to return only events that occur after the specified timestamp.
StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter.

MaxAwaitTime

Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.

ShowExpandedEvents

Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor and set this parameter to True.

BatchSize

Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Collation

Specifies the collation to use for the change stream cursor.

Comment

Attaches a comment to the operation.

Important

You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.

By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, create a ChangeStreamOptions object and specify the FullDocumentBeforeChange or the FullDocument options. Then, pass the ChangeStreamOptions object to the Watch() or WatchAsync() method.

The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, set the FullDocumentBeforeChange option to one of the following values:

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable: The change event includes a pre-image of the modified document for change events only if the pre-image is available.

  • ChangeStreamFullDocumentBeforeChangeOption.Required: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.

The post-image is the full version of a document after a change. To include the post-image in the change stream event, set the FullDocument option to one of the following values:

  • ChangeStreamFullDocumentOption.UpdateLookup: The change event includes a copy of the entire changed document from some time after the change.

  • ChangeStreamFullDocumentOption.WhenAvailable: The change event includes a post-image of the modified document for change events only if the post-image is available.

  • ChangeStreamFullDocumentOption.Required: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.

The following example opens a change stream on a collection and includes the post-image of updated documents by specifying the FullDocument option. Select the Asynchronous or Synchronous tab to see the corresponding code.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await collection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = collection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}

Running the preceding code example and updating a document that has a "name" value of "Blarney Castle" results in the following change stream output:

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.

To learn more about change streams, see Change Streams in the MongoDB Server manual.

To learn more about any of the methods or types discussed in this guide, see the following API documentation:

Back

Retrieve Distinct Field Values