Monitor Data Changes
On this page
Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
When using the Ruby driver, you can call the watch
method to return a
Mongo::Collection::View::ChangeStream
object. Then, you can iterate through
its content to monitor data changes, such as updates, insertions, and deletions.
Sample Data
The examples in this guide use the restaurants
collection in the sample_restaurants
database from the Atlas sample datasets. To access this collection
from your Ruby application, create a Mongo::Client
object that connects to
an Atlas cluster and assign the following values to your database
and collection
variables:
database = client.use('sample_restaurants') collection = database[:restaurants]
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.
Open a Change Stream
To open a change stream, call the watch
method. The object on which you
call the watch
method determines the scope of events that the change
stream monitors. You can call the watch
method on the following objects
Mongo::Client
: Monitors changes to all collections across all databases in a deployment, excluding system collections or collections in theadmin
,local
, andconfig
databasesMongo::Database
: Monitors changes to all collections in one databaseMongo::Collection
: Monitors changes to one collection
The following example opens a change stream on the restaurants
collection
and outputs changes as they occur:
stream = collection.watch stream.each do |doc| puts doc break if doc['operationType'] == 'invalidate' end
To begin watching for changes, run the preceding code. Then, in a separate
shell, modify the restaurants
collection. The following example updates
a document that has a name
field value of 'Blarney Castle'
:
collection.update_one( { 'name' => 'Blarney Castle' }, { '$set' => { 'cuisine' => 'Irish' } } )
When you update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following output:
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=>#<...>, "ns"=>{"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=> {"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=> {"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}}
Modify the Change Stream Output
To modify the change stream output, you can pass pipeline stages in an array as a
parameter to the watch
method. You can include the following stages in the
array:
$addFields
or$set
: Adds new fields to documents$match
: Filters the documents$project
: Projects a subset of the document fields$replaceWith
or$replaceRoot
: Replaces the input document with the specified document$redact
: Restricts the contents of the documents$unset
: Removes fields from documents
The following example passes a pipeline that includes the $match
stage to the
watch
method. This instructs the watch
method to output events only
when update operations occur:
pipeline = [{ '$match' => { 'operationType' => 'update' } }] stream = collection.watch(pipeline) stream.each do |doc| puts doc break if doc['operationType'] == 'invalidate' end
Modify watch Behavior
To modify the behavior of the watch
method, you can pass an options hash
as a parameter to watch
. The following table describes some of the options that
you can set:
Option | Description |
---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Specifies the logical starting point for the change stream. This option is mutually exclusive with start_at_operation_time . |
| Instructs the change stream to only provide changes that occurred at or after
the specified timestamp. This option is mutually exclusive with resume_after . |
| Sets the collation to use for the change stream cursor. |
For a full list of watch
options, see watch
in the API documentation.
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the modified fields and their values before and after the operation.
You can instruct the watch
method to return the document's pre-image, the
full version of the document before changes, in addition to the modified fields. To
include the pre-image in the change stream event, pass an options hash to watch
that sets the full_document_before_change
option. You can set this option to the
following string values:
'whenAvailable'
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, this change event field has anil
value.'required'
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the server raises an error.'off'
: (Default) The change event does not include a pre-image of the modified document.
You can also instruct the watch
method to return the document's post-image,
the full version of the document after changes, in addition to the modified fields.
To include the post-image in the change stream event, pass an options hash to watch
that sets the full_document
option. You can set this option to the following string
values:
'updateLookup'
: The change event includes a copy of the entire changed document from some time after the change.'whenAvailable'
: The change event includes a post-image of the modified document for change events. If the post-image is not available, this change event field has anil
value.'required'
: The change event includes a post-image of the modified document for change events. If the post-image is not available, the server raises an error.'default'
: (Default) The change event does not include a post-image of the modified document.
The following example calls the watch
method on a collection and includes the post-image
of updated documents by setting the full_document
option:
options = { full_document: 'updateLookup' } stream = collection.watch([], options) stream.each do |doc| puts doc break if doc['operationType'] == 'invalidate' end
With the change stream application running in a separate shell, updating a
document in the restaurants
collection by using the preceding update
example prints a change event resembling the following
output:
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=> #<...1>, "wallTime"=>..., "fullDocument"=>{"_id"=>BSON::ObjectId('...'), "address"=>{"building"=>"202-24", "coord"=>[-73.9250442, 40.5595462], "street"=>"Rockaway Point Boulevard", "zipcode"=>"11697"}, "borough"=>"Queens", "cuisine"=>"Irish", "grades"=>[...], "name"=>"Blarney Castle", "restaurant_id"=>"40366356"}, "ns"=> {"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=> {"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=> {"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}}
Tip
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: