Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

使用变更数据捕获处理程序复制数据

在此页面上

  • Overview
  • 使用 CDC 处理器复制数据
  • 完成教程设置
  • 启动交互式 shell
  • 配置 Source Connector
  • 配置 Sink Connector
  • 监控“Kafka 主题”
  • 将数据写入源数据库并观察数据流
  • (可选)生成其他更改
  • 总结
  • 了解详情

跟随本教程学习如何使用更改数据捕获 (CDC) 处理程序与 MongoDB Kafka Connector 一起复制数据。CDC 处理程序是将 CDC 事件转换为 MongoDB 写操作的应用程序。如果需要将单个数据存储中的更改复制到另一个数据存储中,请使用 CDC 处理程序。

本教程中,您将配置并运行 MongoDB Kafka 源连接器和接收连接器,以便使用 CDC 使两个 MongoDB 集合包含相同的文档。源连接器将原始集合中的变更流数据写入 Kafka 主题,接收连接器将 Kafka 主题数据写入目标 MongoDB 集合。

如果您想详细学习;了解CDC 处理程序的工作原理,请参阅变更数据捕获处理程序指南。

1

完成 Kafka Connector 教程设置中的步骤,以启动 Confluent Kafka Connect 和 MongoDB 环境。

2

在不同窗口中的 Docker 容器上启动两个交互式 Shell。在本教程中,您可以使用 Shell 来运行和观察不同任务。

从终端运行以下命令,启动交互式 Shell。

docker exec -it mongo1 /bin/bash

在本教程中,我们将此交互式 shell 称为 CDCShell1

在第二个终端运行以下命令,启动交互式 Shell:

docker exec -it mongo1 /bin/bash

在本教程中,我们将把这个交互式 Shell 称为 CDCShell2

请将屏幕上的两个窗口排列好,以便同时查看实时更新。

使用 CDCShell1 配置连接器并监控 Kafka 主题。使用 CDCShell2 在 MongoDB 中执行写操作。

3

CDCShell1 中,配置一个源连接器,从 CDCTutorial.Source MongoDB 命名空间读取数据并写入 CDCTutorial.Source Kafka 话题。

使用以下命令创建名为 cdc-source.json 的配置文件:

nano cdc-source.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

CDCShell1 中运行以下命令,使用您创建的配置文件启动源连接器:

cx cdc-source.json

注意

cx 命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

在 shell 中运行以下命令检查连接器的状态:

status

如果 Source 连接器成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

CDCShell1 中,配置接收器连接器以将数据从 CDCTutorial.Source Kafka 主题复制到 CDCTutorial.Destination MongoDB 命名空间。

使用以下命令创建名为 cdc-sink.json 的配置文件:

nano cdc-sink.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

在 shell 中运行以下命令,使用您创建的配置文件启动 Sink Connector:

cx cdc-sink.json

在 shell 中运行以下命令检查连接器的状态:

status

如果 Sink Connector 成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

CDCShell1 中,监控 Kafka 主题是否有传入事件。运行以下命令启动 kafkacat 应用程序,将输出发布到主题的数据:

kc CDCTutorial.Source

注意

kc 命令是教程开发环境中包含的自定义脚本,调用 kafkacat 应用程序的选项来连接到 Kafka 并格式化指定主题的输出。

启动后,您将看到以下输出,显示当前没有数据可读取:

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

CDCShell2 中,通过运行以下命令,使用 mongosh (MongoDB Shell) 连接到 MongoDB:

mongosh "mongodb://mongo1"

连接成功后,您应看到以下 MongoDB shell 提示:

rs0 [direct: primary] test>

在提示符处,键入以下命令以将新文档插入到CDCTutorial.Source MongoDB 命名空间中:

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

MongoDB 完成插入命令后,您应该会收到类似于以下文本的确认:

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

源连接器接收更改并将其发布到 Kafka 主题。您应该会在 CDCShell1 窗口中看到以下主题信息:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

Sink 连接器接收 Kafka 消息并将数据汇入 MongoDB。您可以通过在 CDCShell2 中启动的 MongoDB shell 中运行以下命令,从 MongoDB 中的 CDCTutorial.Destination 命名空间检索文档:

db.Destination.find()

您应该会在结果中看到以下文档:

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

尝试通过从 MongoDB Shell 运行以下命令来从 CDCTutorial.Source 命名空间中删除文档:

db.Source.deleteMany({})

您应该会在 CDCShell1 窗口中看到以下主题信息:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

运行以下命令以检索集合中的当前文档数:

db.Destination.count()

此操作会返回以下输出,表明集合为空:

0

运行以下命令退出 MongoDB shell:

exit

在本教程中,您将设置一个源连接器来捕获对 MongoDB 集合的更改并将其发送到 Apache Kafka。您还使用 MongoDB CDC 处理程序配置了接收器连接器,以将数据从 Apache Kafka 移动到 MongoDB 集合。

阅读以下资源,详细了解本教程中提到的概念:

后退

开始使用 MongoDB Kafka Sink Connector