复制现有数据
此用法示例演示如何使用 MongoDB Kafka 源连接器将数据从 MongoDB 集合复制到 Apache Kafka 主题。
例子
假设您需要将 MongoDB 集合复制到 Apache Kafka 并过滤一些数据。
要求和解决方案如下:
要求 | 解决方案 |
---|---|
将 MongoDB 部署中的 shopping 数据库的 customers 集合复制到 Apache Kafka 主题上。 | See the Copy Data section of this guide. |
仅复制 country 字段中值为“Mexico”的文档。 | See the Filter Data section of this guide. |
customers
集合包含以下文档:
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
复制数据
通过在 Source 连接器中指定以下配置选项,复制 shopping
数据库的 customers
集合的内容:
database=shopping collection=customers startup.mode=copy_existing
源连接器通过创建用于描述将每个文档插入集合中的变更事件文档来复制集合。
注意
数据复制可以生成重复事件
如果任何系统在 Source 连接器从数据库转换现有数据时更改数据库中的数据,MongoDB 可能会生成重复的变更流事件以反映最新更改。由于数据复制所依赖的变更流事件是幂等的,因此复制的数据最终是一致的。
要学习;了解有关变更事件文档的详情,请参阅 Change Streams指南。
要了解有关 startup.mode
选项的更多信息,请参阅启动属性。
过滤数据
您可以通过在源连接器配置的 startup.mode.copy.existing.pipeline
选项中指定聚合管道来过滤数据。以下配置指定一个聚合管道,与 country
字段中包含“Mexico”的所有文档相匹配:
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
要了解有关 startup.mode.copy.existing.pipeline
选项的更多信息,请参阅启动属性。
要了解有关聚合管道的更多信息,请参阅以下资源:
自定义管道以筛选更改事件使用示例
MongoDB 手册中的聚合。
指定配置
复制 customers
集合的最终源连接器配置应该如下所示:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
连接器复制数据后,您会看到与 shopping.customers
Apache Kafka 主题中的上述示例集合相对应的以下变更事件文档:
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }