Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

开始使用 MongoDB Kafka Source Connector

在此页面上

  • MongoDB Kafka 源连接器入门
  • 总结
  • 了解详情

按照本教程学习如何配置 MongoDB Kafka 源连接器以读取变更流中的数据并将其发布到 Apache Kafka 主题。

1

完成 Kafka Connector教程设置中的步骤,启动 Confluence Kafka Connect 和MongoDB环境。

2

使用以下命令,在为教程设置下载的教程 Docker 容器上创建交互式 Shell 会话:

docker exec -it mongo1 /bin/bash

使用以下命令创建名为 simplesource.json 的源配置文件:

nano simplesource.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "Tutorial1",
"collection": "orders"
}
}

在 shell 中运行以下命令,使用您创建的配置文件启动源连接器:

cx simplesource.json

注意

cx 命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

在 shell 中运行以下命令检查连接器的状态:

status

如果 Source 连接器成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
...
3

在同一 Shell 中,通过运行以下命令,使用 mongosh (MongoDB Shell) 连接到 MongoDB:

mongosh "mongodb://mongo1"

连接成功后,您应看到以下 MongoDB shell 提示:

rs0 [direct: primary] test>

在提示符处,键入以下命令以插入新文档:

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

MongoDB 完成插入命令后,您应该会收到类似于以下文本的确认:

{
acknowledged: true,
insertedId: ObjectId("627e7e...")
}

输入 exit 命令以退出 MongoDB Shell。

使用以下命令检查 Kafka 环境的状态:

status

在上述命令的输出中,您应该看到源连接器在收到变更事件后创建的新主题:

...
"topic": "Tutorial1.orders",
...

通过运行以下命令,确认新 Kafka 主题上的数据内容:

kc Tutorial1.orders

注意

kc 命令是一个帮助程序脚本,用于输出 Kafka 主题的内容。

运行上述命令时,您应该会看到以下 Kafka 主题数据,这些数据是按“Key”和“Value”部分组织的:

在输出的“Value”部分,您可以在以下格式化的 JSON 文档中找到包含突出显示的 fullDocument 数据的 payload 部分:

{
"_id": {
"_data": "8262655A..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650809557,
"i": 2
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "62655a..."
},
"order_id": 1,
"item": "coffee"
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62655a..."
}
}
}
4

您可以将变更流配置为仅返回 fullDocument 字段,忽略变更流创建的事件中的元数据。

使用以下命令停止连接器:

del mongo-simple-source

注意

del 命令是一个帮助脚本,用于调用 Kafka Connect REST API 来停止连接器,相当于以下命令:

curl -X DELETE connect:8083/connectors/<parameter>

使用以下命令创建名为 simplesource.json 的源配置文件:

nano simplesource.json

删除现有配置,添加以下配置并保存文件:

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"publish.full.document.only": true,
"database": "Tutorial1",
"collection": "orders"
}
}

在 shell 中运行以下命令,使用您更新的配置文件启动源连接器:

cx simplesource.json

使用以下命令通过 mongosh 连接 MongoDB:

mongosh "mongodb://mongo1"

在提示符处,键入以下命令以插入新文档:

use Tutorial1
db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )

通过运行以下命令退出 mongosh

exit

通过运行以下命令,确认新 Kafka 主题上的数据内容:

kc Tutorial1.orders

“Value”文档的 payload 字段应仅包含以下文档数据:

{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
5

完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。

提示

更多教程

如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。

选择与要运行的删除任务对应的选项卡。

运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:

docker-compose -p mongo-kafka down --rmi all

运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:

docker-compose -p mongo-kafka down

要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。

在本教程中,您使用不同的配置启动源连接器,更改发布到 Kafka 主题的变更流事件数据。

阅读以下资源,详细了解本教程中提到的概念:

  • 源连接器配置属性

  • Kafka Connect REST API

后退

深入了解 MongoDB 变更流