Copy Existing Data
This usage example demonstrates how to copy data from a MongoDB collection to an Apache Kafka topic using the MongoDB Kafka source connector.
Example
Suppose you want to copy a MongoDB collection to Apache Kafka and filter some data.
Your requirements and your solutions are as follows:
Requirement | Solution |
---|---|
Copy the | See the Copy Data section of this guide. |
Only copy documents that have the value "Mexico" in the | See the Filter Data section of this guide. |
The customers
collection contains the following documents:
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
Copy Data
Copy the contents of the customers
collection of the shopping
database by
specifying the following configuration options in your source connector:
database=shopping collection=customers startup.mode=copy_existing
Your source connector copies your collection by creating change event documents that describe inserting each document into your collection.
Note
Data Copy Can Produce Duplicate Events
If any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent.
To learn more about change event documents, see the Change Streams guide.
To learn more about the startup.mode
option, see
Startup Properties.
Filter Data
You can filter data by specifying an aggregation pipeline in the
startup.mode.copy.existing.pipeline
option of your source connector configuration. The
following configuration specifies an aggregation pipeline that matches all
documents with "Mexico" in the country
field:
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
To learn more about the startup.mode.copy.existing.pipeline
option, see
Startup Properties.
To learn more about aggregation pipelines, see the following resources:
Customize a Pipeline to Filter Change Events Usage Example
Aggregation in the MongoDB manual.
Specify the Configuration
Your final source connector configuration to copy the customers
collection should
look like this:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
Once your connector copies your data, you see the following change event
document corresponding to the
preceding sample collection
in the shopping.customers
Apache Kafka topic:
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
Note
Write the Data in your Topic into a Collection
Use a change data capture handler to convert change event documents in an Apache Kafka topic into MongoDB write operations. To learn more, see the Change Data Capture Handlers guide.