Docs Menu
Docs Home
/ / /
Rust Driver
/ / /

Open Change Streams

On this page

  • Overview
  • Sample Data
  • Open a Change Stream
  • Example
  • Apply Aggregation Operators to your Change Stream
  • Example
  • Include Pre-Images and Post-Images
  • Create a Collection with Pre-Image and Post-Images Enabled
  • Pre-Image Configuration Example
  • Post-Image Configuration Example
  • Additional Information
  • API Documentation

In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a single collection, database, or deployment.

You can open a change stream to help perform the following actions:

  • Enable devices to track and react to events, such as motion-detecting cameras

  • Create an application that monitors changes in stock prices

  • Generate a log of employee activity for specific job positions

You can specify a set of aggregation operators to filter and transform the data that your application receives. When connected to a MongoDB deployment v6.0 or later, you can also configure the events to include the document data before and after the change.

To learn how to open and configure a change stream, navigate to the following sections:

  • Sample Data

  • Open a Change Stream

  • Apply Aggregation Operators to your Change Stream

  • Include Pre-Images and Post-Images

  • Additional Information

The examples in this guide monitor changes to the directors collection. Assume that this collection contains the following documents, which are modelled as structs:

let docs = vec! [
Director {
name: "Todd Haynes".to_string(),
movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()],
oscar_noms: 1,
},
Director {
name: "Jane Campion".to_string(),
movies: vec! ["The Piano".to_string(), "Bright Star".to_string()],
oscar_noms: 5,
}
];

Tip

To learn how to insert documents into a collection, see the Insert Documents guide.

You can open a change stream to subscribe to specific types of data changes and produce change events in your application.

To open a change stream, call the watch() method on a Collection, Database, or Client instance.

Important

Standalone MongoDB deployments don't support change streams because the feature requires a replica set oplog. To learn more about the oplog, see the Replica Set Oplog page in the Server manual.

The struct type on which you call the watch() method determines the scope of events that the change stream listens for. The following table describes the behavior of the watch() method based on its calling object:

Struct Type
Behavior of watch()
Collection
Monitors changes to the individual collection
Database
Monitors changes to all collections in the database
Client
Monitors all changes in the connected MongoDB deployment

The following example opens a change stream on the directors collection. The code prints the operation type and modified document of each change event by accessing the operation_type and full_document fields of each ChangeStreamEvent instance:

let mut change_stream = my_coll.watch().await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Document: {:?}", event.full_document);
}

Tip

For a full list of ChangeStreamEvent struct fields, see ChangeStreamEvent in the API documentation.

If you insert a document into the directors collection in which the name value is "Wes Anderson", the preceding code produces the following output:

Operation performed: Insert
Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })

You can chain the pipeline() method to the watch() method to specify which change events the change stream receives. Pass an aggregation pipeline as a parameter to pipeline().

To learn which aggregation operators your MongoDB Server version supports, see Modify Change Stream Output in the Server manual.

The following example creates an aggregation pipeline to filter for update operations. Then, the code passes the pipeline to the pipeline() method, configuring the change stream to only receive and print change events for update operations:

let mut update_change_stream = my_coll.watch()
.pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }])
.await?;
while let Some(event) = update_change_stream.next().await.transpose()? {
println!("Update performed: {:?}", event.update_description);
}

If you update the document in which the name value is "Todd Haynes" by increasing the value of the oscar_noms field, the preceding code produces the following output:

Update performed: Some(UpdateDescription { updated_fields: Document({
"oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })

Tip

To learn how to perform update operations, see the Modify Documents guide.

You can configure the change event to contain or omit the following data:

  • Pre-image: a document that represents the version of the document before the operation, if it exists

  • Post-image: a document that represents the version of the document after the operation, if it exists

Important

You can enable pre- and post-images on collections only if your deployment uses MongoDB v6.0 or later.

To receive change stream events that include a pre-image or post-image, you must perform the following actions:

  • Enable pre-images and post-images for the collection on your MongoDB deployment.

    Tip

    To learn how to enable pre- and post-images on your deployment, see Change Streams with Document Pre- and Post-Images in the Server manual.

    To learn how to instruct the driver to create a collection with pre-images and post-images enabled, see the Create a Collection with Pre-Image and Post-Images Enabled section of this page.

  • Configure your change stream to retrieve either or both the pre-images and post-images. During this configuration, you can instruct the driver to require pre- and post-images or to only include them when available.

    Tip

    To configure your change stream to record the pre-image in change events, see the Pre-Image Configuration Example on this page.

    To configure your change stream to record the post-image in change events, see the Post-Image Configuration Example on this page.

To enable pre-image and post-image documents for your collection, use the change_stream_pre_and_post_images() option builder method. The following example uses this builder method to specify collection options and creates a collection for which pre- and post-images are available:

let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build();
let result = my_db.create_collection("directors")
.change_stream_pre_and_post_images(enable)
.await?;

You can change the pre-image and post-image option in an existing collection by running the collMod command from the MongoDB Shell or from your application. To learn how to perform this operation, see the Run a Command guide and the entry on collMod in the Server manual.

Warning

If you enabled pre-images or post-images on a collection, modifying these settings with collMod can cause existing change streams on that collection to terminate.

To configure a change stream that returns change events containing the pre-image, use the full_document_before_change() option builder method. The following example specifies change stream options and creates a change stream that returns pre-image documents:

let pre_image = FullDocumentBeforeChangeType::Required;
let mut change_stream = my_coll.watch()
.full_document_before_change(pre_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Pre-image: {:?}", event.full_document_before_change);
}

The preceding example passes a value of FullDocumentBeforeChangeType::Required to the full_document_before_change() option builder method. This method configures the change stream to require pre-images for replace, update, and delete change events. If the pre-image is not available, the driver raises an error.

If you update a document in which the name value is "Jane Campion", the change event produces the following output:

Operation performed: Update
Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano",
"Bright Star"], oscar_noms: 5 })

To configure a change stream that returns change events containing the post-image, use the full_document() option builder method. The following example specifies change stream options and creates a change stream that returns post-image documents:

let post_image = FullDocumentType::WhenAvailable;
let mut change_stream = my_coll.watch()
.full_document(post_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Post-image: {:?}", event.full_document);
}

The preceding example passes a value of FullDocument::WhenAvailable to the full_document() option builder method. This method configures the change stream to return post-images for replace, update, and delete change events if the post-image is available.

If you delete the document in which the value of name is "Todd Haynes", the change event produces the following output:

Operation performed: Delete
Post-image: None

To learn more about any of the methods or types mentioned in this guide, see the following API documentation:

Back

Access Data by Using a Cursor