Monitor Data Changes
On this page
Overview
In this guide, you can learn how to use the Kotlin Sync driver to monitor a change stream, allowing you to view real-time changes to your database. A change stream is a MongoDB Server feature that publishes data changes on a collection, database, or deployment. Your application can subscribe to a change stream and use events to perform other actions.
Sample Data
The examples in this guide use the restaurants
collection in the sample_restaurants
database from the Atlas sample datasets. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the
Get Started with Atlas guide.
The following Kotlin data class models the documents in this collection:
data class Restaurant( val name: String, val cuisine: String, )
Open a Change Stream
To open a change stream, call the watch()
method. The instance on which you
call the watch()
method on determines the scope of events that the change
stream listens for. You can call the watch()
method on instances of the following
classes:
MongoClient
: To monitor all changes in the MongoDB deploymentMongoDatabase
: To monitor changes in all collections in the databaseMongoCollection
: To monitor changes in the collection
Open a Change Stream Example
The following example opens a change stream on the restaurants
collection
and prints changes as they occur:
collection.watch().forEach { change -> println(change) }
To begin watching for changes, run the application. Then, in a separate
application or shell, perform a write operation on the restaurants
collection. The
following example updates a document in which the value of the name
is "Blarney Castle"
:
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle") val update = Updates.set(Restaurant::cuisine.name, "Irish") val result = collection.updateOne(filter, update)
When you update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following:
{ "_id": { ... }, "operationType": "update", "clusterTime": { ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } ... }
Modify the Change Stream Output
You can pass the pipeline
parameter to the watch()
method to modify the
change stream output. This parameter allows you to watch for only specified
change events. Format the parameter as a list of objects that each represents an
aggregation stage.
You can specify the following stages in the pipeline
parameter:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
Match Specific events Example
The following example uses the pipeline
parameter that includes a $match
to open a change stream that only records update operations:
val pipeline = listOf( Aggregates.match(Filters.eq("operationType", "update")) ) collection.watch(pipeline).forEach { change -> println(change) }
To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.
Modify watch() Behavior
You can modify the watch()
by chaining methods to the ChangeStreamIterable
object returned by the watch()
method call. If you don't specify any options, the
driver does not customize the operation.
The following table describes methods you can use to customize the behavior
of watch()
:
Method | Description |
---|---|
batchSize() | Sets the number of documents to return per batch. |
collation() | Specifies the kind of language collation to use when sorting
results. For more information, see Collation
in the MongoDB Server manual. |
comment() | Specifies a comment to attach to the operation. |
fullDocument() | Sets the fullDocument value. To learn more, see the
Include Pre-Images and Post-Images section of this document. |
fullDocumentBeforeChange() | Sets the fullDocumentBeforeChange value. To learn more, see the
Include Pre-Images and Post-Images section of this document. |
maxAwaitTime() | Sets the maximum await execution time on the server for this operation, in
milliseconds. |
For a complete list of methods you can use to configure the watch()
method, see
the ChangeStreamIterable
API documentation.
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, chain the fullDocumentBeforeChange()
or the fullDocument()
methods to the watch()
method.
The pre-image is the full version of a document before a change. To include the
pre-image in the change stream event, pass one of the following options to the
fullDocumentBeforeChange()
method:
FullDocumentBeforeChange.WHEN_AVAILABLE
: The change event includes a pre-image of the modified document for change events only if the pre-image is available.FullDocumentBeforeChange.REQUIRED
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.
The post-image is the full version of a document after a change. To include the
post-image in the change stream event, pass one of the following options to the
fullDocument()
method:
FullDocument.UPDATE_LOOKUP
: The change event includes a copy of the entire changed document from some time after the change.FullDocument.WHEN_AVAILABLE
: The change event includes a post-image of the modified document for change events only if the post-image is available.FullDocument.REQUIRED
: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.
The following example calls the watch()
method on a collection and includes the post-image
of updated documents in the results by specifying the fullDocument
parameter:
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change -> println("Received a change: $change") }
With the change stream application running, updating a document in the
restaurants
collection by using the preceding update example prints a change event resembling the following:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{value=..., seconds=..., inc=...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: