将现有集合迁移到时间序列集合
在此页面上
按照本教程学习如何使用 MongoDB Kafka Connector 将现有 MongoDB 集合转换为时间序列集合。
时间序列集合可有效存储时间序列数据。 时间序列数据由按时间间隔进行的测量、描述测量的元数据以及测量时间组成。
要使用连接器将数据从 MongoDB 集合转换为时间序列集合,需要执行以下任务:
标识collection中所有文档共有的时间字段。
配置connector以将现有collection数据复制到 Kafka 主题。
配置connector以将 Kafka 主题数据复制到time-series collection。
在本教程中,您将执行上述任务,将股票数据从集合迁移到时间序列集合。 time-series collection 更有效地存储和索引数据,并保留使用聚合操作符分析一段时间内股票表现的能力。
将集合迁移到时间序列集合
完成教程设置
完成 Kafka Connector教程设置中的步骤,启动 Confluence Kafka Connect 和MongoDB环境。
生成样本数据
运行以下命令,在 Docker 环境中启动脚本,生成样本collection,其中包含教程 MongoDB 副本集中的虚假股票代码及其价格:
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"
数据生成器开始运行后,您应该会看到生成的数据如下所示:
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ...
配置 Source Connector
在另一个窗口中,使用以下命令在为教程设置下载的教程 Docker container 上创建交互式 shell 会话:
docker exec -it mongo1 /bin/bash
使用以下命令创建名为 stock-source.json
的源配置文件:
nano stock-source.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-source-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": "true", "connection.uri": "mongodb://mongo1", "topic.prefix": "marketdata", "database": "Stocks", "collection": "PriceData", "copy.existing": "true" } }
此配置指示connector将现有数据从PriceData
MongoDB collection复制到marketdata.Stocks.PriceData
Kafka 主题,并在完成后将任何未来数据插入该collection中。
在 shell 中运行以下命令,使用您创建的配置文件启动源连接器:
cx stock-source.json
注意
cx
命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"
在 shell 中运行以下命令检查连接器的状态:
status
如果 Source 连接器成功启动,应会看到以下输出:
Kafka topics: ... The status of the connectors: source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ...
源连接器启动后,运行以下命令以确认 Kafka 主题已收到集合数据:
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData
输出应显示源连接器发布的主题数据,如下所示:
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}
您可以通过键入 CTRL+C
退出kafkacat
}。
配置 Sink Connector
配置connector以从Kafka主题读取数据,并将其写入名为Stocks
的数据库中名为StockDataMigrate
的时间序列集合。
使用以下命令创建名为stock-sink.json
的接收器配置文件:
nano stock-sink.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-sink-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "marketdata.Stocks.PriceData", "connection.uri": "mongodb://mongo1", "database": "Stocks", "collection": "StockDataMigrate", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield": "tx_time", "timeseries.timefield.auto.convert": "true", "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
提示
上面的connector配置使用时间字段日期格式转换器。或者,您可以使用TimestampConverter
单一消息转换 (SMT) 将tx_time
字段从String
转换为ISODate
。 使用TimestampConverter
SMT 时,您必须为 Kafka 主题中的数据定义模式。
有关如何使用TimestampConverter
SMT 的信息,请参阅 TimestampConverter Confluent 文档。
在 shell 中运行以下命令,使用更新的配置文件启动 sink connector:
cx stock-sink.json
接收器connector完成对主题数据的处理后,StockDataMigrate
time-series collection中的文档将包含具有tx_time
ISODate
类型值的field。
验证时间序列集合数据
接收器连接器完成对主题数据的处理后, StockDataMigrate
时间序列集合应包含PriceData
集合中的所有市场数据。
要查看 MongoDB 中的数据,请运行以下命令以使用mongosh
连接到副本集:
mongosh "mongodb://mongo1"
在提示符处,键入以下命令以检索 Stocks.StockDataMigrate
MongoDB 命名空间中的所有文档:
use Stocks db.StockDataMigrate.find()
您应该会看到该命令返回的文档列表,类似于以下文档:
{ tx_time: ISODate("2022-05-25T21:16:35.983Z"), _id: ObjectId("628e9..."), symbol: 'FAV', price: 18.43, company_name: 'FUZZY ATTACK VENTURES' }
总结
在本教程中,您创建了一个股票报价机数据生成器,用于定期将数据写入 MongoDB collection。您配置了源连接器以将数据复制到 Kafka 主题中,并配置了接收器连接器以将该数据写入新的 MongoDB 时间序列集合。
了解详情
阅读以下资源,详细了解本教程中提到的概念: