Docs Menu
Docs Home
/ / /
Kotlin Coroutine
/

Watch for Changes

You can keep track of changes to data in MongoDB, such as changes to a collection, database, or deployment, by opening a change stream. A change stream allows applications to watch for changes to data and react to them.

The change stream returns change event documents when changes occur. A change event contains information about the updated data.

Open a change stream by calling the watch() method on a MongoCollection, MongoDatabase, or MongoClient object as shown in the following code example:

val changeStream = collection.watch()

The watch() method optionally takes an aggregation pipeline which consists of an array of stages as the first parameter to filter and transform the change event output as follows:

val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15)))
val changeStream = collection.watch(pipeline)

The watch() method returns an instance of ChangeStreamFlow, a class that offers several methods to access, organize, and traverse the results. ChangeStreamFlow also inherits methods from its parent class Flow from the Kotlin Coroutines library.

You can call collect() on the ChangeStreamFlow to handle events as they occur. Alternatively, you can use other methods built in to Flow to work with the results.

To configure options for processing the documents returned from the change stream, use member methods of the ChangeStreamFlow object returned by watch(). See the link to the ChangeStreamFlow API documentation at the bottom of this example for more details on the available methods.

To capture events from a change stream, call the collect() method as shown below:

val changeStream = collection.watch()
changeStream.collect {
println("Change observed: $it")
}

The .collect() function triggers when a change event is emitted. You can specify logic in the function to process the event document when it is received.

Note

For update operation change events, change streams only return the modified fields by default rather than the entire updated document. You can configure your change stream to also return the most current version of the document by calling the fullDocument() member method of the ChangeStreamFlow object with the value FullDocument.UPDATE_LOOKUP as follows:

val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)

The following example application opens a change stream on the movies collection in the sample_mflix database. The application use an aggregation pipeline to filter changes based on operationType so that it only receives insert and update events. Deletes are excluded by omission. The application uses the .collect() method to receive and print the filtered change events that occur on the collection.

The application launches the collect() operation in a separate coroutine job, which allows the application to continue running while the change stream is open. Once the operations are complete, the application closes the change stream and exits.

Note

This example connects to an instance of MongoDB using a connection URI. To learn more about connecting to your MongoDB instance, see the connection guide.

import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
data class Movie(val title: String, val year: Int)
fun main() = runBlocking {
// Replace the uri string with your MongoDB deployment's connection string
val uri = "<connection string uri>"
val mongoClient = MongoClient.create(uri)
val database = mongoClient.getDatabase("sample_mflix")
val collection = database.getCollection<Movie>("movies")
val job = launch {
val pipeline = listOf(
Aggregates.match(
Filters.`in`("operationType", mutableListOf("insert", "update"))
)
)
val changeStreamFlow = collection.watch(pipeline)
.fullDocument(FullDocument.DEFAULT)
changeStreamFlow.collect { event ->
println("Received a change to the collection: $event")
}
}
// Insert events captured by the change stream watcher
collection.insertOne(Movie("Back to the Future", 1985))
collection.insertOne(Movie("Freaky Friday", 2003))
// Update event captured by the change stream watcher
collection.updateOne(
Filters.eq(Movie::title.name, "Back to the Future"),
Updates.set(Movie::year.name, 1986)
)
// Delete event not captured by the change stream watcher
collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))
sleep(1000) // Give time for the change stream watcher to process all events
// Cancel coroutine job to stop the change stream watcher
job.cancel()
mongoClient.close()
}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}}
Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}

For additional information on the classes and methods mentioned on this page, see the following resources:

  • Change Streams Server Manual Entry

  • Change Events Server Manual Entry

  • Aggregation Pipeline Server Manual Entry

  • Aggregation Stages Server Manual Entry

  • ChangeStreamFlow API Documentation

  • MongoCollection.watch() API Documentation

  • MongoDatabase.watch() API Documentation

  • MongoClient.watch() API Documentation

Back

Bulk Operations