Watch for Changes
You can keep track of changes to data in MongoDB, such as changes to a collection, database, or deployment, by opening a change stream. A change stream allows applications to watch for changes to data and react to them.
The change stream returns change event documents when changes occur. A change event contains information about the updated data.
Open a change stream by calling the watch()
method on a
MongoCollection
, MongoDatabase
, or MongoClient
object as shown in
the following code example:
val changeStream = collection.watch()
The watch()
method optionally takes an aggregation pipeline which
consists of an array of stages as the first parameter to filter and
transform the change event output as follows:
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
The watch()
method returns an instance of ChangeStreamFlow
, a class
that offers several methods to access, organize, and traverse the results.
ChangeStreamFlow
also inherits methods from its parent class Flow
from the Kotlin Coroutines library.
You can call collect()
on the ChangeStreamFlow
to handle
events as they occur. Alternatively, you can use other methods built in to Flow
to work with the results.
To configure options for processing the documents returned from the change
stream, use member methods of the ChangeStreamFlow
object returned
by watch()
. See the link to the ChangeStreamFlow
API
documentation at the bottom of this example for more details on the
available methods.
Process Change Stream Events with .collect()
To capture events from a change stream, call the collect()
method
as shown below:
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
The .collect()
function triggers when a change event is emitted. You can
specify logic in the function to process the event document when it is
received.
Note
For update operation change events, change streams only return the modified
fields by default rather than the entire updated document. You can configure
your change stream to also return the most current version of the document
by calling the fullDocument()
member method of the ChangeStreamFlow
object with the value FullDocument.UPDATE_LOOKUP
as follows:
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
Example
The following example application opens a change stream on the movies
collection
in the sample_mflix
database. The application use an aggregation pipeline
to filter changes based on operationType
so that it only receives insert and update
events. Deletes are excluded by omission. The application uses the .collect()
method
to receive and print the filtered change events that occur on the collection.
The application launches the collect()
operation in a separate coroutine job,
which allows the application to continue running while the change stream is open.
Once the operations are complete, the application closes the change stream and exits.
Note
This example connects to an instance of MongoDB using a connection URI. To learn more about connecting to your MongoDB instance, see the connection guide.
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
For additional information on the classes and methods mentioned on this page, see the following resources:
Change Streams Server Manual Entry
Change Events Server Manual Entry
Aggregation Pipeline Server Manual Entry
Aggregation Stages Server Manual Entry
ChangeStreamFlow API Documentation
MongoCollection.watch() API Documentation
MongoDatabase.watch() API Documentation
MongoClient.watch() API Documentation