变更数据捕获处理程序
Overview
了解如何使用 MongoDB Kafka 接收器连接器来复制变更数据捕获 (CDC) 事件。CDC 是一种软件架构,它可将数据存储中的更改转换为 CDC 事件流。CDC 事件是一种包含对数据存储所执行更改的可重现表示形式的消息。复制数据是指将 CDC 事件中包含的更改从一个数据存储应用到另一数据存储,以便更改在两个数据存储中执行这些更改的过程。
使用 CDC 处理程序将存储在 Apache Kafka 主题中的 CDC 事件复制到 MongoDB。CDC 处理程序是一种特殊程序,它可将来自特定 CDC 事件生成器的 CDC 事件转换为 MongoDB 写入操作。
CDC 事件生成器是一个生成 CDC 事件的应用程序。CDC 事件生成器可以是数据存储,也可以是监视数据存储并生成与数据存储中的变更相对应的 CDC 事件的应用程序。
注意
MongoDB 变更流是 CDC 架构的一个示例。要了解有关变更流的更多信息,请参阅 MongoDB Kafka Connector 变更流指南。
如果您想查看如何复制数据的演示教程,请参阅使用变更数据捕获处理程序复制数据的教程。
指定 CDC 处理程序
您可以使用以下配置选项在接收器连接器上指定 CDC 处理程序:
change.data.capture.handler=<cdc handler class>
要了解详情,请参阅更改数据捕获配置选项。
可用 CDC 处理程序
接收器连接器为以下 CDC 事件生成器提供 CDC 处理程序:
MongoDB
单机以下标签页,了解如何为前面的事件生成器配置 CDC 处理程序:
以下属性文件配置接收器连接器以复制 MongoDB 变更事件文档:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your connection uri> database=<your database> collection=<your collection> topics=<topic containing mongodb change event documents> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
要查看 MongoDB CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码。
您的接收器连接器可以复制来自这些数据存储的 Debezium CDC 事件:
MongoDB
Postgres
MySQL
单击以下标签页,查看如何配置 Debezium CDC 处理程序以从上述每个数据存储复制 CDC 事件:
以下属性文件配置接收器连接器以复制与 MongoDB 实例中的更改相对应的 Debezium CDC 事件:
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler
注意
如果您使用的是早于2.0的 Debezium CDC 版本,请将 change.data.capture.handler
属性的值设立为com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler
。
要查看 Debezium CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码。
以下属性文件配置接收器连接器,复制与 Postgres 实例中的更改相对应的 Debezium CDC 事件:
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler
要查看 Debezium CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码。
以下属性文件配置接收器连接器以复制与 MySQL 实例中的变更相对应的 Debezium CDC 事件:
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler
要查看 Debezium CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码。
注意
自定义 Debezium CDC 处理程序
如果 Debezium CDC 处理程序无法从您的数据存储中复制 CDC 事件,则可以通过扩展 debeziumCDChandler 类来自定义处理程序。有关自定义 CDC 处理程序的更多信息,请参阅本指南的创建您自己的 CDC 处理程序部分。
您的接收器连接器可以复制来自这些数据存储的 Qlik Replicate CDC 事件:
OracleDB
MySQL
Postgres
以下属性文件配置接收器连接器以复制 Qlik Replicate CDC 事件:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your connection uri> database=<your database> collection=<your collection> topics=<topic containing qlik replicate cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler
要查看 Qlik Replicate CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码。
注意
自定义 Qlik Replicate CDC 处理程序
如果 Qlik Replicate CDC 处理程序无法从您的数据存储中复制 CDC 事件,则可以通过扩展 Q likcdChandler 类来自定义处理程序。有关自定义 CDC 处理程序的更多信息,请参阅本指南的创建您自己的 CDC 处理程序部分。
创建自己的 CDC 处理程序
如果没有任何预构建的 CDC 处理程序适合您的使用案例,还可以自己创建一个。您的自定义 CDC 处理程序是一个 Java 类,用于实施 CdcHandler
接口。
要了解更多信息,请参阅 CdcHandler 接口的源代码。
要查看 CDC 处理程序实现的示例,请参阅预构建 CDC 处理程序的源代码。
如何使用 CDC 处理程序
要将 sink connector 配置为使用自定义 CDC 处理程序,您必须执行以下动作:
将自定义 CDC 处理程序类编译为 JAR 文件。
将编译好的 JAR 添加到 Kafka 工作进程的类路径/插件路径中。有关插件路径的更多信息,请参阅 Confluent 文档。
注意
Kafka Connect 独立加载插件。部署自定义写策略时,Connector JAR 和 CDC 处理程序 JAR 应位于同一路径上。您的路径应类似如下:
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-CDC-handler.jar
要了解有关 Kafka Connect 插件的更多信息,请参阅 Confluent 提供的指南。
在
change.data.capture.handler
配置设置中指定自定义类。
要了解如何将类编译为 JAR 文件,请参阅 Oracle 提供的这份指南。