Explore MongoDB Change Streams
On this page
Follow this tutorial to learn how to create a change stream on a MongoDB collection and observe the change events it creates.
Explore Change Streams
Complete the Tutorial Setup
Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.
Connect to the Docker Container
Create two interactive shell sessions on the tutorial Docker Container, each in a separate window.
Run the following command from a terminal to start an interactive shell.
docker exec -it mongo1 /bin/bash
We will refer to this interactive shell as ChangeStreamShell1 throughout this tutorial.
Run the following command in a second terminal to start an interactive shell:
docker exec -it mongo1 /bin/bash
We will refer to this interactive shell as ChangeStreamShell2 throughout this tutorial.
Open a Change Stream
In ChangeStreamShell1, create a Python script to open a change stream using the PyMongo driver.
nano openchangestream.py
Paste the following code into the file and save the changes:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Run the Python script:
python3 openchangestream.py
The script outputs the following message after it starts successfully:
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
Trigger a Change Event
In ChangeStreamShell2, connect to MongoDB using mongosh
, the MongoDB
shell, using the following command:
mongosh "mongodb://mongo1"
After you connect successfully, you should see the following MongoDB shell prompt:
rs0 [direct: primary] test>
At the prompt, type the following commands:
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
After entering the preceding commands, switch to ChangeStreamShell1 to view the change stream output, which should resemble the following:
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
To stop the script, press Ctrl+C
.
By the end of this step, you've successfully triggered and observed a change stream event.
Open a Filtered Change Stream
You can apply a filter to a change stream by passing it an aggregation pipeline.
In ChangeStreamShell1, create a new Python script to open a filtered change stream using the PyMongo driver.
nano pipeline.py
Paste the following code into the file and save the changes:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Run the Python script:
python3 pipeline.py
The script outputs the following message after it starts successfully:
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
Observe the Filtered Change Stream
Return to your ChangeStreamShell2 session which should be connected to
MongoDB using mongosh
.
At the prompt, type the following commands:
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
As indicated by the script output, the change stream creates a change event because it matches the following pipeline:
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
Try inserting the following documents in in ChangeStreamShell2 to verify the change stream only produces events when the documents match the filter:
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(Optional) Stop the Docker Containers
After you complete this tutorial, free resources on your computer by stopping or removing Docker assets. You can choose to remove both the Docker containers and images, or exclusively the containers. If you remove the containers and images, you must download them again to restart your MongoDB Kafka Connector development environment, which is approximately 2.4 GB in size. If you exclusively remove the containers, you can reuse the images and avoid downloading most of the large files in the sample data pipeline.
Tip
More Tutorials
If you plan to complete any more MongoDB Kafka Connector tutorials, consider removing only containers. If you don't plan to complete any more MongoDB Kafka Connector tutorials, consider removing containers and images.
Select the tab that corresponds to the removal task you want to run.
Run the following shell command to remove the Docker containers and images for the development environment:
docker-compose -p mongo-kafka down --rmi all
Run the following shell command to remove the Docker containers but keep the images for the development environment:
docker-compose -p mongo-kafka down
To restart the containers, follow the same steps required to start them in the Tutorial Setup.
Summary
In this tutorial, you created a change stream on MongoDB and observed the output. The MongoDB Kafka source connector reads the change events from a change stream that you configure, and writes them to a Kafka topic.
To learn how to configure a change stream and Kafka topic for a source connector, proceed to the Getting Started with the MongoDB Kafka Source Connector tutorial.
Learn More
Read the following resources to learn more about concepts mentioned in this tutorial: