Open Change Streams
On this page
Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a single collection, database, or deployment.
You can open a change stream to help perform the following actions:
Enable devices to track and react to events, such as motion-detecting cameras
Create an application that monitors changes in stock prices
Generate a log of employee activity for specific job positions
You can specify a set of aggregation operators to filter and transform the data that your application receives. When connected to a MongoDB deployment v6.0 or later, you can also configure the events to include the document data before and after the change.
To learn how to open and configure a change stream, navigate to the following sections:
Sample Data
The examples in this guide monitor changes to the directors
collection.
Assume that this collection contains the following documents, which are modelled
as structs:
let docs = vec! [ Director { name: "Todd Haynes".to_string(), movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()], oscar_noms: 1, }, Director { name: "Jane Campion".to_string(), movies: vec! ["The Piano".to_string(), "Bright Star".to_string()], oscar_noms: 5, } ];
Tip
To learn how to insert documents into a collection, see the Insert Documents guide.
Open a Change Stream
You can open a change stream to subscribe to specific types of data changes and produce change events in your application.
To open a change stream, call the watch()
method on a Collection
,
Database
, or Client
instance.
Important
Standalone MongoDB deployments don't support change streams because the feature requires a replica set oplog. To learn more about the oplog, see the Replica Set Oplog page in the Server manual.
The struct type on which you call the watch()
method determines the scope of
events that the change stream listens for. The following table describes the
behavior of the watch()
method based on its calling object:
Struct Type | Behavior of watch() |
---|---|
Collection | Monitors changes to the individual collection |
Database | Monitors changes to all collections in the database |
Client | Monitors all changes in the connected MongoDB deployment |
Example
The following example opens a change stream on the directors
collection.
The code prints the operation type and modified document of each change event by
accessing the operation_type
and full_document
fields of each ChangeStreamEvent
instance:
let mut change_stream = my_coll.watch().await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Document: {:?}", event.full_document); }
Tip
For a full list of ChangeStreamEvent
struct fields, see ChangeStreamEvent in the API documentation.
If you insert a document into the directors
collection in which the name
value
is "Wes Anderson"
, the preceding code produces the following output:
Operation performed: Insert Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })
Apply Aggregation Operators to your Change Stream
You can chain the pipeline()
method to the watch()
method
to specify which change events the change stream receives. Pass an
aggregation pipeline as a parameter to pipeline()
.
To learn which aggregation operators your MongoDB Server version supports, see Modify Change Stream Output in the Server manual.
Example
The following example creates an aggregation pipeline to filter for update
operations. Then, the code passes the pipeline to the pipeline()
method,
configuring the change stream to only receive and print change events for
update operations:
let mut update_change_stream = my_coll.watch() .pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }]) .await?; while let Some(event) = update_change_stream.next().await.transpose()? { println!("Update performed: {:?}", event.update_description); }
If you update the document in which the name
value is "Todd Haynes"
by increasing the value of the oscar_noms
field, the preceding code
produces the following output:
Update performed: Some(UpdateDescription { updated_fields: Document({ "oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })
Tip
To learn how to perform update operations, see the Modify Documents guide.
Include Pre-Images and Post-Images
You can configure the change event to contain or omit the following data:
Pre-image: a document that represents the version of the document before the operation, if it exists
Post-image: a document that represents the version of the document after the operation, if it exists
Important
You can enable pre- and post-images on collections only if your deployment uses MongoDB v6.0 or later.
To receive change stream events that include a pre-image or post-image, you must perform the following actions:
Enable pre-images and post-images for the collection on your MongoDB deployment.
Tip
To learn how to enable pre- and post-images on your deployment, see Change Streams with Document Pre- and Post-Images in the Server manual.
To learn how to instruct the driver to create a collection with pre-images and post-images enabled, see the Create a Collection with Pre-Image and Post-Images Enabled section of this page.
Configure your change stream to retrieve either or both the pre-images and post-images. During this configuration, you can instruct the driver to require pre- and post-images or to only include them when available.
Tip
To configure your change stream to record the pre-image in change events, see the Pre-Image Configuration Example on this page.
To configure your change stream to record the post-image in change events, see the Post-Image Configuration Example on this page.
Create a Collection with Pre-Image and Post-Images Enabled
To enable pre-image and post-image documents for your collection, use the
change_stream_pre_and_post_images()
option builder method. The following example
uses this builder method to specify collection options and creates a collection
for which pre- and post-images are available:
let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build(); let result = my_db.create_collection("directors") .change_stream_pre_and_post_images(enable) .await?;
You can change the pre-image and post-image option in an existing collection
by running the collMod
command from the MongoDB Shell or from your application.
To learn how to perform this operation, see the Run a Command guide
and the entry on collMod in the Server manual.
Warning
If you enabled pre-images or post-images on a collection, modifying
these settings with collMod
can cause existing change streams on
that collection to terminate.
Pre-Image Configuration Example
To configure a change stream that returns change events containing the pre-image,
use the full_document_before_change()
option builder method. The following example
specifies change stream options and creates a change stream that returns pre-image
documents:
let pre_image = FullDocumentBeforeChangeType::Required; let mut change_stream = my_coll.watch() .full_document_before_change(pre_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Pre-image: {:?}", event.full_document_before_change); }
The preceding example passes a value of FullDocumentBeforeChangeType::Required
to the full_document_before_change()
option builder method. This method configures the change
stream to require pre-images for replace, update, and delete change events. If the pre-image
is not available, the driver raises an error.
If you update a document in which the name
value is "Jane Campion"
, the change event
produces the following output:
Operation performed: Update Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano", "Bright Star"], oscar_noms: 5 })
Post-Image Configuration Example
To configure a change stream that returns change events containing the post-image,
use the full_document()
option builder method. The following example specifies change
stream options and creates a change stream that returns post-image documents:
let post_image = FullDocumentType::WhenAvailable; let mut change_stream = my_coll.watch() .full_document(post_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Post-image: {:?}", event.full_document); }
The preceding example passes a value of FullDocument::WhenAvailable
to the full_document()
option builder method. This method configures the change stream to return post-images for replace,
update, and delete change events if the post-image is available.
If you delete the document in which the value of name
is "Todd Haynes"
, the
change event produces the following output:
Operation performed: Delete Post-image: None
Additional Information
API Documentation
To learn more about any of the methods or types mentioned in this guide, see the following API documentation: