Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

变更数据捕获处理程序

在此页面上

  • Overview
  • 指定 CDC 处理程序
  • 可用 CDC 处理程序
  • 创建自己的 CDC 处理程序
  • 如何使用 CDC 处理程序

了解如何使用 MongoDB Kafka 接收器连接器来复制变更数据捕获 (CDC) 事件。CDC 是一种软件架构,它可将数据存储中的更改转换为 CDC 事件流。CDC 事件是一种包含对数据存储所执行更改的可重现表示形式的消息。复制数据是指将 CDC 事件中包含的更改从一个数据存储应用到另一数据存储,以便更改在两个数据存储中执行这些更改的过程。

使用 CDC 处理程序将存储在 Apache Kafka 主题中的 CDC 事件复制到 MongoDB。CDC 处理程序是一种特殊程序,它可将来自特定 CDC 事件生成器的 CDC 事件转换为 MongoDB 写入操作。

CDC 事件生成器是一个生成 CDC 事件的应用程序。CDC 事件生成器可以是数据存储,也可以是监视数据存储并生成与数据存储中的变更相对应的 CDC 事件的应用程序。

注意

MongoDB 变更流是 CDC 架构的一个示例。要了解有关变更流的更多信息,请参阅 MongoDB Kafka Connector 变更流指南。

如果您想查看如何复制数据的演示教程,请参阅使用变更数据捕获处理程序复制数据的教程

重要

CDC 和后处理器

您无法将后处理器应用于 CDC 事件。如果尝试同时指定二者,此连接器则会记录一则警告。

您可以使用以下配置选项在接收器连接器上指定 CDC 处理程序:

change.data.capture.handler=<cdc handler class>

要了解详情,请参阅更改数据捕获配置选项。

接收器连接器为以下 CDC 事件生成器提供 CDC 处理程序:

  • MongoDB

  • Debezium

  • Qlik Replicate

单机以下标签页,了解如何为前面的事件生成器配置 CDC 处理程序:

以下属性文件配置接收器连接器以复制 MongoDB 变更事件文档:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your connection uri>
database=<your database>
collection=<your collection>
topics=<topic containing mongodb change event documents>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

要查看 MongoDB CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码。

您的接收器连接器可以复制来自这些数据存储的 Debezium CDC 事件:

  • MongoDB

  • Postgres

  • MySQL

单击以下标签页,查看如何配置 Debezium CDC 处理程序以从上述每个数据存储复制 CDC 事件:

以下属性文件配置接收器连接器以复制与 MongoDB 实例中的更改相对应的 Debezium CDC 事件:

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler

注意

如果您使用的是早于2.0的 Debezium CDC 版本,请将 change.data.capture.handler属性的值设立为com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler

要查看 Debezium CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码

以下属性文件配置接收器连接器,复制与 Postgres 实例中的更改相对应的 Debezium CDC 事件:

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler

要查看 Debezium CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码

以下属性文件配置接收器连接器以复制与 MySQL 实例中的变更相对应的 Debezium CDC 事件:

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler

要查看 Debezium CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码

注意

自定义 Debezium CDC 处理程序

如果 Debezium CDC 处理程序无法从您的数据存储中复制 CDC 事件,则可以通过扩展 debeziumCDChandler 类来自定义处理程序。有关自定义 CDC 处理程序的更多信息,请参阅本指南的创建您自己的 CDC 处理程序部分。

您的接收器连接器可以复制来自这些数据存储的 Qlik Replicate CDC 事件:

  • OracleDB

  • MySQL

  • Postgres

以下属性文件配置接收器连接器以复制 Qlik Replicate CDC 事件:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your connection uri>
database=<your database>
collection=<your collection>
topics=<topic containing qlik replicate cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler

要查看 Qlik Replicate CDC 处理程序的源代码,请参阅 MongoDB Kafka Connector 源代码

注意

自定义 Qlik Replicate CDC 处理程序

如果 Qlik Replicate CDC 处理程序无法从您的数据存储中复制 CDC 事件,则可以通过扩展 Q likcdChandler 类来自定义处理程序。有关自定义 CDC 处理程序的更多信息,请参阅本指南的创建您自己的 CDC 处理程序部分。

如果没有任何预构建的 CDC 处理程序适合您的使用案例,还可以自己创建一个。您的自定义 CDC 处理程序是一个 Java 类,用于实施 CdcHandler 接口。

要了解更多信息,请参阅 CdcHandler 接口的源代码。

要查看 CDC 处理程序实现的示例,请参阅预构建 CDC 处理程序的源代码。

要将 sink connector 配置为使用自定义 CDC 处理程序,您必须执行以下动作:

  1. 将自定义 CDC 处理程序类编译为 JAR 文件。

  2. 将编译好的 JAR 添加到 Kafka 工作进程的类路径/插件路径中。有关插件路径的更多信息,请参阅 Confluent 文档。

    注意

    Kafka Connect 独立加载插件。部署自定义写策略时,Connector JAR 和 CDC 处理程序 JAR 应位于同一路径上。您的路径应类似如下:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-CDC-handler.jar

    要了解有关 Kafka Connect 插件的更多信息,请参阅 Confluent 提供的指南

  3. change.data.capture.handler 配置设置中指定自定义类。

要了解如何将类编译为 JAR 文件,请参阅 Oracle 提供的这份指南

后退

Error Handling