Docs Home → MongoDB Kafka Connector
Replicate Data with a Change Data Capture Handler
On this page
Overview
Follow this tutorial to learn how to use a change data capture (CDC) handler to replicate data with the MongoDB Kafka Connector. A CDC handler is an application that translates CDC events into MongoDB write operations. Use a CDC handler when you need to reproduce the changes in one datastore into another datastore.
In this tutorial, you configure and run MongoDB Kafka source and sink connectors to make two MongoDB collections contain the same documents using CDC. The source connector writes change stream data from the original collection to a Kafka topic and the sink connector writes the Kafka topic data to the target MongoDB collection.
If you want to learn more about how CDC handlers work, see the Change Data Capture Handlers guide.
Replicate Data with a CDC Handler
Complete the Tutorial Setup
Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.
Start Interactive Shells
Start two interactive shells on the Docker container in separate windows. In the tutorial, you can use the shells to run and observe different tasks.
Run the following command from a terminal to start an interactive shell.
docker exec -it mongo1 /bin/bash
We will refer to this interactive shell as CDCShell1 throughout this tutorial.
Run the following command in a second terminal to start an interactive shell:
docker exec -it mongo1 /bin/bash
We will refer to this interactive shell as CDCShell2 throughout this tutorial.
Arrange the two windows on your screen to keep both of them visible to see real-time updates.
Use CDCShell1 to configure your connectors and monitor your Kafka topic. Use CDCShell2 to perform write operations in MongoDB.
Configure the Source Connector
In CDCShell1, configure a source connector to read from the
CDCTutorial.Source
MongoDB namespace and write to the
CDCTutorial.Source
Kafka topic.
Create a configuration file called cdc-source.json
using the
following command:
nano cdc-source.json
Paste the following configuration information into the file and save your changes:
{ "name": "mongo-cdc-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Source" } }
Run the following command in CDCShell1 to start the source connector using the configuration file you created:
cx cdc-source.json
Note
The cx
command is a custom script included in the tutorial
development environment. This script runs the following
equivalent request to the Kafka Connect REST API to create a new
connector:
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
Run the following command in the shell to check the status of the connectors:
status
If your source connector started successfully, you should see the following output:
Kafka topics: ... The status of the connectors: source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...
Configure the Sink Connector
In CDCShell1, configure a sink connector to copy data from the
CDCTutorial.Source
Kafka topic to CDCTutorial.Destination
MongoDB namespace.
Create a configuration file called cdc-sink.json
using the
following command:
nano cdc-sink.json
Paste the following configuration information into the file and save your changes:
{ "name": "mongo-cdc-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Destination" } }
Run the following command in the shell to start the sink connector using the configuration file you created:
cx cdc-sink.json
Run the following command in the shell to check the status of the connectors:
status
If your sink connector started successfully, you should see the following output:
Kafka topics: ... The status of the connectors: sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ...
Monitor the Kafka Topic
In CDCShell1, monitor the Kafka topic for incoming events. Run the
following command to start the kafkacat
application which outputs
data published to the topic:
kc CDCTutorial.Source
Note
The kc
command is a custom script included in the tutorial
development environment that calls the kafkacat
application
with options to connect to Kafka and format the output of the
specified topic.
Once started, you should see the following output that indicates there is currently no data to read:
% Reached end of topic CDCTutorial.Source [0] at offset 0
Write Data into the Source and Watch the Data Flow
In CDCShell2, connect to MongoDB using mongosh
, the MongoDB
shell by running the following command:
mongosh "mongodb://mongo1"
After you connect successfully, you should see the following MongoDB shell prompt:
rs0 [direct: primary] test>
At the prompt, type the following commands to insert a new document
into the CDCTutorial.Source
MongoDB namespace:
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });
Once MongoDB completes the insert command, you should receive an acknowledgment that resembles the following text:
{ acknowledged: true, insertedId: ObjectId("600b38ad...") }
The source connector picks up the change and publishes it to the Kafka topic. You should see the following topic message in your CDCShell1 window:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8260..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } }, "wallTime": { "$date": "..." }, "fullDocument": { "_id": { "$oid": "600b38ad..." }, "proclaim": "Hello World!" }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38a..." } } } }
The sink connector picks up the Kafka message and sinks the data
into MongoDB. You can retrieve the document from the
CDCTutorial.Destination
namespace in MongoDB by running the
following command in the MongoDB shell you started in CDCShell2:
db.Destination.find()
You should see the following document returned as the result:
[ { _id: ObjectId("600b38a..."), proclaim: 'Hello World' } ]
(Optional) Generate Additional Changes
Try removing documents from the CDCTutorial.Source
namespace
by running the following command from the MongoDB shell:
db.Source.deleteMany({})
You should see the following topic message in your CDCShell1 window:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8261...." }, ... "operationType": "delete", "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "6138..." } } } }
Run the following command to retrieve the current number of documents in the collection:
db.Destination.count()
This returns the following output, indicating the collection is empty:
0
Run the following command to exit the MongoDB shell:
exit
Summary
In this tutorial, you set up a source connector to capture changes to a MongoDB collection and send them to Apache Kafka. You also configured a sink connector with a MongoDB CDC Handler to move the data from Apache Kafka to a MongoDB collection.
Learn More
Read the following resources to learn more about concepts mentioned in this tutorial: