Monitor Data Changes
On this page
Overview
In this guide, you can learn how to monitor document changes with a change stream.
A change stream outputs new change events, providing access to real-time data changes. You can open a change stream on a collection, database, or client object.
Sample Data
To run the examples in this guide, load these documents into the
db.courses
collection with the following
snippet:
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, Course{Title: "Modern Poetry", Enrollment: 12}, Course{Title: "Plate Tectonics", Enrollment: 35}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
Non-existent Databases and Collections
If the necessary database and collection don't exist when you perform a write operation, the server implicitly creates them.
Each document contains a description of a university course that
includes the course title and maximum enrollment, corresponding to
the title
and enrollment
fields in each document.
Note
Each example output shows truncated _data
, clusterTime
, and
ObjectID
values because the driver generates them uniquely.
Open a Change Stream
To open a change stream, use the Watch()
method. The Watch()
method requires a context
parameter and a pipeline parameter. To return all changes, pass in an empty Pipeline object.
Example
The following example opens a change stream on the db.courses
collection and
outputs all changes:
coll := client.Database("db").Collection("courses") // open a change stream with an empty pipeline parameter changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // iterate over the cursor to print the change-stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
If you modify the db.courses
collection in a separate program or shell, this code will print
your changes as they occur. Inserting a document with a title
value
of "Advanced Screenwriting" and an enrollment
value of 20
results in the following change-stream event:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
Modify the Change Stream Output
Use the pipeline parameter to modify the change stream output. This parameter allows you to only watch for certain change events. Format the pipeline parameter as an array of documents, with each document representing an aggregation stage.
You can use the following pipeline stages in this parameter:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
Example
The following example opens a change stream on the db
database, but only watches for
new delete operations:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Deleting the document with the title
value of "Advanced Screenwriting"
in a separate program or shell results in the following change-stream event:
{"_id": {"_data": "..."},"operationType": "delete","clusterTime": {"$timestamp":{"t":"...","i":"..."}},"ns": {"db": "db","coll": "courses"}, "documentKey": {"_id": {"$oid":"..."}}}
Note
The Watch()
method was called on the db
database, so the code outputs
new delete operations in any collection within this database.
Modify the Behavior of Watch()
Use an opts parameter to modify the behavior of the Watch()
method.
You can specify the following options in the opts parameter:
ResumeAfter
StartAfter
FullDocument
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
For more information on these fields, visit the MongoDB manual.
Example
The following example calls the Watch()
method on the db.courses
collection. It
specifies the FullDocument
options parameter to output a copy of the entire modified document:
coll := client.Database("db").Collection("courses") opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Updating the enrollment
value of the document with the
title
of "World Fiction" from 35
to 30
results in the
following change-stream event:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
Without specifying the FullDocument
option, the same update operation no longer
outputs the "fullDocument"
value.
Additional Information
For a runnable example of a change stream, see Monitor Data Changes.
For more information on change streams, see Change Streams.
API Documentation
To learn more about the Watch()
method, visit the following API documentation links: