Getting Started with the MongoDB Kafka Source Connector
Follow this tutorial to learn how to configure a MongoDB Kafka source connector to read data from a change stream and publish it to an Apache Kafka topic.
Get Started with the MongoDB Kafka Source Connector
Complete the Tutorial Setup
Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.
Configure the Source Connector
Create an interactive shell session on the tutorial Docker container downloaded for the Tutorial Setup using the following command:
docker exec -it mongo1 /bin/bash
Create a source configuration file called simplesource.json
with
the following command:
nano simplesource.json
Paste the following configuration information into the file and save your changes:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
Run the following command in the shell to start the source connector using the configuration file you created:
cx simplesource.json
Note
The cx
command is a custom script included in the tutorial
development environment. This script runs the following
equivalent request to the Kafka Connect REST API to create a new
connector:
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
Run the following command in the shell to check the status of the connectors:
status
If your source connector started successfully, you should see the following output:
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
Create Change Events
In the same shell, connect to MongoDB using mongosh
, the MongoDB
shell by running the following command:
mongosh "mongodb://mongo1"
After you connect successfully, you should see the following MongoDB shell prompt:
rs0 [direct: primary] test>
At the prompt, type the following commands to insert a new document:
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
Once MongoDB completes the insert command, you should receive an acknowledgment that resembles the following text:
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
Exit the MongoDB shell by entering the command exit
.
Check the status of your Kafka environment using the following command:
status
In the output of the preceding command, you should see the new topic that the source connector created after receiving the change event:
... "topic": "Tutorial1.orders", ...
Confirm the content of data on the new Kafka topic by running the following command:
kc Tutorial1.orders
Note
The kc
command is a helper script that outputs the content of
a Kafka topic.
You should see the following Kafka topic data, organized by "Key" and "Value" sections when you run the preceding command:
From the "Value" section of the output, you can find the part of the
payload
that includes the fullDocument
data as highlighted in
the following formatted JSON document:
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
Reconfigure the Change Stream
You can omit the metadata from the events created by the change
stream by configuring it to only return the fullDocument
field.
Stop the connector using the following command:
del mongo-simple-source
Note
The del
command is a helper script that calls the Kafka
Connect REST API to stop the connector and is equivalent to the
following command:
curl -X DELETE connect:8083/connectors/<parameter>
Edit the source configuration file called simplesource.json
with
the following command:
nano simplesource.json
Remove the existing configuration, add the following configuration, and save the file:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
Run the following command in the shell to start the source connector using the configuration file you updated:
cx simplesource.json
Connect to MongoDB using mongosh
using the following command:
mongosh "mongodb://mongo1"
At the prompt, type the following commands to insert a new document:
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
Exit mongosh
by running the following command:
exit
Confirm the content of data on the new Kafka topic by running the following command:
kc Tutorial1.orders
The payload
field in the "Value" document should contain only the
following document data:
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(Optional) Stop the Docker Containers
After you complete this tutorial, free resources on your computer by stopping or removing Docker assets. You can choose to remove both the Docker containers and images, or exclusively the containers. If you remove the containers and images, you must download them again to restart your MongoDB Kafka Connector development environment, which is approximately 2.4 GB in size. If you exclusively remove the containers, you can reuse the images and avoid downloading most of the large files in the sample data pipeline.
Tip
More Tutorials
If you plan to complete any more MongoDB Kafka Connector tutorials, consider removing only containers. If you don't plan to complete any more MongoDB Kafka Connector tutorials, consider removing containers and images.
Select the tab that corresponds to the removal task you want to run.
Run the following shell command to remove the Docker containers and images for the development environment:
docker-compose -p mongo-kafka down --rmi all
Run the following shell command to remove the Docker containers but keep the images for the development environment:
docker-compose -p mongo-kafka down
To restart the containers, follow the same steps required to start them in the Tutorial Setup.
Summary
In this tutorial, you started a source connector using different configurations to alter the change stream event data published to a Kafka topic.
Learn More
Read the following resources to learn more about concepts mentioned in this tutorial: