Migrate an Existing Collection to a Time Series Collection
Follow this tutorial to learn how to convert an existing MongoDB collection to a time series collection using the MongoDB Kafka Connector.
Time series collections efficiently store time series data. Time series data consists of measurements taken at time intervals, metadata that describes the measurement, and the time of the measurement.
To convert data from a MongoDB collection to a time series collection using the connector, you must perform the following tasks:
Identify the time field common to all documents in the collection.
Configure a source connector to copy the existing collection data to a Kafka topic.
Configure a sink connector to copy the Kafka topic data to the time series collection.
In this tutorial, you perform these preceding tasks to migrate stock data from a collection to a time series collection. The time series collection stores and indexes the data more efficiently and retains the ability to analyze stock performance over time using aggregation operators.
Migrate a Collection to a Time Series Collection
Complete the Tutorial Setup
Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.
Generate Sample Data
Run the following command to start a script in your Docker environment that generates a sample collection containing fabricated stock symbols and their prices in your tutorial MongoDB replica set:
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"
Once the data generator starts running, you should see the generated data that resembles the following:
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ...
Configure the Source Connector
In a separate terminal window, create an interactive shell session on the tutorial Docker container downloaded for the Tutorial Setup using the following command:
docker exec -it mongo1 /bin/bash
Create a source configuration file called stock-source.json
with the
following command:
nano stock-source.json
Paste the following configuration information into the file and save your changes:
{ "name": "mongo-source-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": "true", "connection.uri": "mongodb://mongo1", "topic.prefix": "marketdata", "database": "Stocks", "collection": "PriceData", "copy.existing": "true" } }
This configuration instructs the connector to copy existing data from
the PriceData
MongoDB collection to the
marketdata.Stocks.PriceData
Kafka topic, and once complete, any
future data inserted in that collection.
Run the following command in the shell to start the source connector using the configuration file you created:
cx stock-source.json
Note
The cx
command is a custom script included in the tutorial
development environment. This script runs the following
equivalent request to the Kafka Connect REST API to create a new
connector:
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"
Run the following command in the shell to check the status of the connectors:
status
If your source connector started successfully, you should see the following output:
Kafka topics: ... The status of the connectors: source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ...
Once the source connector starts up, confirm the Kafka topic received the collection data by running the following command:
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData
The output should show topic data as it is published by the source connector that resembles the following:
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}
You can exit kafkacat
by typing CTRL+C
.
Configure the Sink Connector
Configure a sink connector to read data from the Kafka topic and write
it to a time series collection named StockDataMigrate
in a database
named Stocks
.
Create a sink configuration file called stock-sink.json
with the
following command:
nano stock-sink.json
Paste the following configuration information into the file and save your changes:
{ "name": "mongo-sink-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "marketdata.Stocks.PriceData", "connection.uri": "mongodb://mongo1", "database": "Stocks", "collection": "StockDataMigrate", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield": "tx_time", "timeseries.timefield.auto.convert": "true", "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
Tip
The sink connector configuration above uses the time field date
format converter. Alternatively, you can use the TimestampConverter
Single Message Transform (SMT) to convert the tx_time
field from a
String
to an ISODate
. When using the TimestampConverter
SMT,
you must define a schema for the data in the Kafka topic.
For information on how to use the TimestampConverter
SMT, see the
TimestampConverter
Confluent documentation.
Run the following command in the shell to start the sink connector using the configuration file you updated:
cx stock-sink.json
After your sink connector finishes processing the topic data, the
documents in the StockDataMigrate
time series collection contain
the tx_time
field with an ISODate
type value.
Verify the Time Series Collection Data
Once the sink connector completes processing the topic data, the
StockDataMigrate
time series collection should contain all the
market data from your PriceData
collection.
To view the data in MongoDB, run the following command to connect to
your replica set using mongosh
:
mongosh "mongodb://mongo1"
At the prompt, type the following commands to retrieve all the
documents in the Stocks.StockDataMigrate
MongoDB namespace:
use Stocks db.StockDataMigrate.find()
You should see a list of documents returned from the command that resemble the following document:
{ tx_time: ISODate("2022-05-25T21:16:35.983Z"), _id: ObjectId("628e9..."), symbol: 'FAV', price: 18.43, company_name: 'FUZZY ATTACK VENTURES' }
Summary
In this tutorial, you created a stock ticker data generator that periodically wrote data into a MongoDB collection. You configured a source connector to copy the data into a Kafka topic and configured a sink connector to write that data into a new MongoDB time series collection.
Learn More
Read the following resources to learn more about concepts mentioned in this tutorial: