Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

复制现有数据

此用法示例演示如何使用 MongoDB Kafka 源连接器将数据从 MongoDB 集合复制到 Apache Kafka 主题。

假设您需要将 MongoDB 集合复制到 Apache Kafka 并过滤一些数据。

要求和解决方案如下:

要求
解决方案
将 MongoDB 部署中的 shopping 数据库的 customers 集合复制到 Apache Kafka 主题上。
See the Copy Data section of this guide.
仅复制 country 字段中值为“Mexico”的文档。
See the Filter Data section of this guide.

customers 集合包含以下文档:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

通过在 Source 连接器中指定以下配置选项,复制 shopping 数据库的 customers 集合的内容:

database=shopping
collection=customers
startup.mode=copy_existing

源连接器通过创建用于描述将每个文档插入集合中的变更事件文档来复制集合。

注意

数据复制可以生成重复事件

如果任何系统在 Source 连接器从数据库转换现有数据时更改数据库中的数据,MongoDB 可能会生成重复的变更流事件以反映最新更改。由于数据复制所依赖的变更流事件是幂等的,因此复制的数据最终是一致的。

要学习;了解有关变更事件文档的详情,请参阅 Change Streams指南。

要了解有关 startup.mode 选项的更多信息,请参阅启动属性

您可以通过在源连接器配置的 startup.mode.copy.existing.pipeline 选项中指定聚合管道来过滤数据。以下配置指定一个聚合管道,与 country 字段中包含“Mexico”的所有文档相匹配:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

要了解有关 startup.mode.copy.existing.pipeline 选项的更多信息,请参阅启动属性

要了解有关聚合管道的更多信息,请参阅以下资源:

复制 customers 集合的最终源连接器配置应该如下所示:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

连接器复制数据后,您会看到与 shopping.customers Apache Kafka 主题中的上述示例集合相对应的以下变更事件文档:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

注意

将主题中的数据写入集合

使用变更数据捕获处理程序将 Apache Kafka 主题中的变更事件文档转换为 MongoDB 写入操作。要了解更多信息,请参阅变更数据捕获处理程序指南。

后退

主题命名