Docs Menu
Docs Home
/ / /
Kotlin Coroutine
/ / /

Open Change Streams

On this page

  • Overview
  • Open a Change Stream
  • Apply Aggregation Operators to your Change Stream
  • Split Large Change Stream Events
  • Include Pre-images and Post-images

In this guide, you can learn how to use a change stream to monitor real-time changes to your database. A change stream is a MongoDB server feature that allows your application to subscribe to data changes on a single collection, database, or deployment. You can specify a set of aggregation operators to filter and transform the data your application receives. When connecting to MongoDB v6.0 or later, you can configure the events to include the document data before and after the change.

Learn how to open and configure your change streams in the following sections:

  • Open a Change Stream

  • Apply Aggregation Operators to your Change Stream

  • Split Large Change Stream Events

  • Include Pre-images and Post-images

You can open a change stream to subscribe to specific types of data changes and produce change events in your application.

To open a change stream, call the watch() method on an instance of a MongoCollection, MongoDatabase, or MongoClient.

Important

Standalone MongoDB deployments don't support change streams because the feature requires a replica set oplog. To learn more about the oplog, see the Replica Set Oplog server manual page.

The object on which you call the watch() method on determines the scope of events that the change stream listens for.

If you call watch() on a MongoCollection, the change stream monitors a collection.

If you call watch() on a MongoDatabase, the change stream monitors all collections in that database.

If you call watch() on a MongoClient, the change stream monitors all changes in the connected MongoDB deployment.

The following code example shows how to open a change stream and print change stream events whenever the data in the collection changes:

// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

An insert operation on the collection should produce output similar to the following text:

Received a change event: ChangeStreamDocument{
operationType='insert',
resumeToken={"_data": "825EC..."},
namespace=myDb.myChangeStreamCollection,
...
}

For a runnable example, see the Watch for Changes usage example page.

To learn more about the watch() method, see the following API documentation:

You can pass an aggregation pipeline as a parameter to the watch() method to specify which change events the change stream receives.

To learn which aggregation operators your MongoDB server version supports, see Modify Change Stream Output.

The following code example shows how you can apply an aggregation pipeline to configure your change stream to receive change events for only insert and update operations:

val pipeline = listOf(
Aggregates.match(Filters.`in`("operationType",
listOf("insert", "update")))
)
// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

When the change stream receives an update change event, the preceding code example outputs the following text:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
...

When connecting to MongoDB v7.0 or later, you can use the $changeStreamSplitLargeEvent aggregation operator to split event documents that exceed 16 MB into smaller fragments.

Use the $changeStreamSplitLargeEvent operator only when you expect the change stream events to exceed the document size limit. For example, you might use this feature if your application requires full document pre-images or post-images.

A $changeStreamSplitLargeEvent aggregation stage returns fragments sequentially. You can access the fragments by using a change stream cursor. Each fragment document includes a splitEvent object that contains the following fields:

Field
Description

fragment

The index of the fragment, starting at 1

of

The total number of fragments that compose the split event

The following example opens a change stream that includes an aggregation pipeline with an $changeStreamSplitLargeEvent aggregation stage to split large events:

val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

Note

You can have only one $changeStreamSplitLargeEvent stage in your aggregation pipeline, and it must be the last stage in the pipeline.

To learn more about the $changeStreamSplitLargeEvent aggregation operator, see $changeStreamSplitLargeEvent (aggregation) in the Server manual.

You can configure the change event to contain or omit the following data:

  • The pre-image which is a document that represents the version of the document before the operation if it exists

  • The post-image which is a document that represents the version of the document after the operation if it exists

To receive change stream events that include a pre-image or post-image, you must connect to a MongoDB v6.0 or later deployment and set up the following:

  • Enable pre-images and post-images for the collection on your MongoDB deployment.

    Tip

    To learn how to enable these on your deployment, see the Change Streams with Document Pre- and Post-Images MongoDB server manual page.

    To learn how to instruct the driver to create a collection with pre-images and post-images enabled, see the Create a Collection with Pre-Image and Post-Images Enabled section.

  • Configure your change stream to retrieve either or both the pre-images and post-images.

    Tip

    To configure your change stream to include the pre-image, see the Pre-image Configuration Example.

    To configure your change stream to include the post-image, see the Post-image Configuration Example.

To create a collection with the pre-image and post-image option using the driver, specify an instance of ChangeStreamPreAndPostImagesOptions and call the createCollection() method as shown in the following example:

val collectionOptions = CreateCollectionOptions()
collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true))
database.createCollection("myChangeStreamCollection", collectionOptions)

You can change the pre-image and post-image option in an existing collection by running the collMod command from the MongoDB Shell. To learn how to perform this operation, see the collMod server manual documentation.

Warning

When you modify this option on a collection, any change streams open on that collection in your application may fail if configured to require receiving the pre-image or post-image.

The following code example shows how you can configure a change stream to include the pre-image and output the results:

val job = launch {
val changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

The preceding example configures the change stream to use the FullDocumentBeforeChange.REQUIRED option. This configures the change stream to return pre-images for replace, update, and delete change events and for the server to raise an error if the pre-image is unavailable.

Suppose an application updated the latestVersion field of a document in a collection of software library dependencies from the value of 2.0.0 to 2.1.0. The corresponding change event output by the preceding code example should resemble the following text:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...}
namespace=software.libraries,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}},
...

For a list of options, see the FullDocumentBeforeChange API documentation.

The following code example shows how you can configure a change stream to include the post-image and output the results:

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

The preceding example configures the change stream to use the FullDocument.UPDATE_LOOKUP option. This configures the change stream to return both the deltas between the original and changed document and a copy of the document at some point in time after the change occurred.

Suppose an application updated the population field of a document from the value of 800 to 950 in a collection of city census data. The corresponding change event output by the preceding code example should resemble the following text:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
namespace=censusData.cities,
destinationNamespace=null,
fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}},
updatedFields={"population": 950}, ...
...

For a list of options, see the FullDocument API documentation.

Back

Access Data From a Flow