开始使用 MongoDB Kafka Source Connector
按照本教程学习如何配置 MongoDB Kafka 源连接器以读取变更流中的数据并将其发布到 Apache Kafka 主题。
MongoDB Kafka 源连接器入门
完成教程设置
完成 Kafka Connector教程设置中的步骤,启动 Confluence Kafka Connect 和MongoDB环境。
配置 Source Connector
使用以下命令,在为教程设置下载的教程 Docker 容器上创建交互式 Shell 会话:
docker exec -it mongo1 /bin/bash
使用以下命令创建名为 simplesource.json
的源配置文件:
nano simplesource.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
在 shell 中运行以下命令,使用您创建的配置文件启动源连接器:
cx simplesource.json
注意
cx
命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
在 shell 中运行以下命令检查连接器的状态:
status
如果 Source 连接器成功启动,应会看到以下输出:
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
创建变更事件
在同一 Shell 中,通过运行以下命令,使用 mongosh
(MongoDB Shell) 连接到 MongoDB:
mongosh "mongodb://mongo1"
连接成功后,您应看到以下 MongoDB shell 提示:
rs0 [direct: primary] test>
在提示符处,键入以下命令以插入新文档:
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
MongoDB 完成插入命令后,您应该会收到类似于以下文本的确认:
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
输入 exit
命令以退出 MongoDB Shell。
使用以下命令检查 Kafka 环境的状态:
status
在上述命令的输出中,您应该看到源连接器在收到变更事件后创建的新主题:
... "topic": "Tutorial1.orders", ...
通过运行以下命令,确认新 Kafka 主题上的数据内容:
kc Tutorial1.orders
注意
kc
命令是一个帮助程序脚本,用于输出 Kafka 主题的内容。
运行上述命令时,您应该会看到以下 Kafka 主题数据,这些数据是按“Key”和“Value”部分组织的:
在输出的“Value”部分,您可以在以下格式化的 JSON 文档中找到包含突出显示的 fullDocument
数据的 payload
部分:
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
重新配置变更流
您可以将变更流配置为仅返回 fullDocument
字段,忽略变更流创建的事件中的元数据。
使用以下命令停止连接器:
del mongo-simple-source
注意
del
命令是一个帮助脚本,用于调用 Kafka Connect REST API 来停止连接器,相当于以下命令:
curl -X DELETE connect:8083/connectors/<parameter>
使用以下命令创建名为 simplesource.json
的源配置文件:
nano simplesource.json
删除现有配置,添加以下配置并保存文件:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
在 shell 中运行以下命令,使用您更新的配置文件启动源连接器:
cx simplesource.json
使用以下命令通过 mongosh
连接 MongoDB:
mongosh "mongodb://mongo1"
在提示符处,键入以下命令以插入新文档:
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
通过运行以下命令退出 mongosh
:
exit
通过运行以下命令,确认新 Kafka 主题上的数据内容:
kc Tutorial1.orders
“Value”文档的 payload
字段应仅包含以下文档数据:
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(可选)停止 Docker 容器
完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。
提示
更多教程
如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。
选择与要运行的删除任务对应的选项卡。
运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:
docker-compose -p mongo-kafka down --rmi all
运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:
docker-compose -p mongo-kafka down
要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。
总结
在本教程中,您使用不同的配置启动源连接器,更改发布到 Kafka 主题的变更流事件数据。
了解详情
阅读以下资源,详细了解本教程中提到的概念: