Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

开始使用 MongoDB Kafka Sink Connector

在此页面上

  • 开始使用 MongoDB Kafka Sink Connector
  • 总结
  • 了解详情

跟随本教程操作,学习如何配置 MongoDB Kafka 接收器连接器,以从 Apache Kafka 主题读取数据并将其写入 MongoDB 集合。

1

完成 Kafka Connector教程设置中的步骤,启动 Confluence Kafka Connect 和MongoDB环境。

2

使用以下命令在教程 Docker 容器上创建交互式 Shell 会话:

docker exec -it mongo1 /bin/bash

使用以下命令创建名为 simplesink.json 的源配置文件:

nano simplesink.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-tutorial-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "Tutorial2.pets",
"connection.uri": "mongodb://mongo1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets"
}
}

注意

配置属性中突出显示的行指定了转换器,用于指示连接器如何转换来自 Kafka 的数据。

在 shell 中运行以下命令,使用您创建的配置文件启动 Sink Connector:

cx simplesink.json

注意

cx 命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:

curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"

在 shell 中运行以下命令检查连接器的状态:

status

如果 Sink Connector 成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
Currently configured connectors
[
"mongo-tutorial-sink"
]
...
3

在同一 Shell 中,创建一个 Python 脚本以将数据写入到 Kafka 主题。

nano kafkawrite.py

将以下代码粘贴到文件中,然后保存更改:

from kafka import KafkaProducer
import json
from json import dumps
p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8'))
data = {'name': 'roscoe'}
p.send('Tutorial2.pets', value = data)
p.flush()

运行 Python 脚本:

python3 kafkawrite.py
4

在同一 Shell 中,通过运行以下命令,使用 mongosh (MongoDB Shell) 连接到 MongoDB:

mongosh "mongodb://mongo1"

连接成功后,您应看到以下 MongoDB shell 提示:

rs0 [direct: primary] test>

在提示符处,键入以下命令以检索 Tutorial2.pets MongoDB 命名空间中的所有文档:

use Tutorial2
db.pets.find()

您应该会在结果中看到以下文档:

{ _id: ObjectId("62659..."), name: 'roscoe' }

输入 exit 命令以退出 MongoDB Shell。

5

完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。

提示

更多教程

如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。

选择与要运行的删除任务对应的选项卡。

运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:

docker-compose -p mongo-kafka down --rmi all

运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:

docker-compose -p mongo-kafka down

要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。

在本教程中,您配置了一个接收器连接器,以将数据从 Kafka 主题保存到 MongoDB 集群中的集合。

阅读以下资源,详细了解本教程中提到的概念:

  • 接收器连接器配置属性

  • Kafka Connector 转换器简介

  • Kafka Connect REST API

后退

开始使用 MongoDB Kafka Source Connector