Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

Explore MongoDB Change Streams

On this page

  • Explore Change Streams
  • Summary
  • Learn More

Follow this tutorial to learn how to create a change stream on a MongoDB collection and observe the change events it creates.

1

Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.

2

Create two interactive shell sessions on the tutorial Docker Container, each in a separate window.

Run the following command from a terminal to start an interactive shell.

docker exec -it mongo1 /bin/bash

We will refer to this interactive shell as ChangeStreamShell1 throughout this tutorial.

Run the following command in a second terminal to start an interactive shell:

docker exec -it mongo1 /bin/bash

We will refer to this interactive shell as ChangeStreamShell2 throughout this tutorial.

3

In ChangeStreamShell1, create a Python script to open a change stream using the PyMongo driver.

nano openchangestream.py

Paste the following code into the file and save the changes:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Run the Python script:

python3 openchangestream.py

The script outputs the following message after it starts successfully:

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

In ChangeStreamShell2, connect to MongoDB using mongosh, the MongoDB shell, using the following command:

mongosh "mongodb://mongo1"

After you connect successfully, you should see the following MongoDB shell prompt:

rs0 [direct: primary] test>

At the prompt, type the following commands:

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

After entering the preceding commands, switch to ChangeStreamShell1 to view the change stream output, which should resemble the following:

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

To stop the script, press Ctrl+C.

By the end of this step, you've successfully triggered and observed a change stream event.

5

You can apply a filter to a change stream by passing it an aggregation pipeline.

In ChangeStreamShell1, create a new Python script to open a filtered change stream using the PyMongo driver.

nano pipeline.py

Paste the following code into the file and save the changes:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Run the Python script:

python3 pipeline.py

The script outputs the following message after it starts successfully:

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

Return to your ChangeStreamShell2 session which should be connected to MongoDB using mongosh.

At the prompt, type the following commands:

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

As indicated by the script output, the change stream creates a change event because it matches the following pipeline:

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

Try inserting the following documents in in ChangeStreamShell2 to verify the change stream only produces events when the documents match the filter:

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

After you complete this tutorial, free resources on your computer by stopping or removing Docker assets. You can choose to remove both the Docker containers and images, or exclusively the containers. If you remove the containers and images, you must download them again to restart your MongoDB Kafka Connector development environment, which is approximately 2.4 GB in size. If you exclusively remove the containers, you can reuse the images and avoid downloading most of the large files in the sample data pipeline.

Tip

More Tutorials

If you plan to complete any more MongoDB Kafka Connector tutorials, consider removing only containers. If you don't plan to complete any more MongoDB Kafka Connector tutorials, consider removing containers and images.

Select the tab that corresponds to the removal task you want to run.

Run the following shell command to remove the Docker containers and images for the development environment:

docker-compose -p mongo-kafka down --rmi all

Run the following shell command to remove the Docker containers but keep the images for the development environment:

docker-compose -p mongo-kafka down

To restart the containers, follow the same steps required to start them in the Tutorial Setup.

In this tutorial, you created a change stream on MongoDB and observed the output. The MongoDB Kafka source connector reads the change events from a change stream that you configure, and writes them to a Kafka topic.

To learn how to configure a change stream and Kafka topic for a source connector, proceed to the Getting Started with the MongoDB Kafka Source Connector tutorial.

Read the following resources to learn more about concepts mentioned in this tutorial:

  • Change Streams and the Source Connector

  • Modify Change Stream Output

  • MongoDB Shell (mongosh)

Back

Tutorial Setup