Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

Change Data Capture Handlers

On this page

  • Overview
  • Specify a CDC Handler
  • Available CDC Handlers
  • Create Your Own CDC Handler
  • How to Use Your CDC Handler

Learn how to replicate your change data capture (CDC) events with a MongoDB Kafka sink connector. CDC is a software architecture that converts changes in a datastore into a stream of CDC events. A CDC event is a message containing a reproducible representation of a change performed on a datastore. Replicating data is the process of applying the changes contained in CDC events from one data store onto a different datastore so that the changes occur in both datastores.

Use a CDC handler to replicate CDC events stored on an Apache Kafka topic into MongoDB. A CDC handler is a program that translates CDC events from a specific CDC event producer into MongoDB write operations.

A CDC event producer is an application that generates CDC events. CDC event producers can be datastores, or applications that watch datastores and generate CDC events corresponding to changes in the datastores.

Note

MongoDB change streams is an example of a CDC architecture. To learn more about change streams, see the MongoDB Kafka Connector guide on Change Streams.

If you would like to view a tutorial demonstrating how to replicate data, see the Replicate Data With a Change Data Capture Handler tutorial.

Important

CDC and Post Processors

You cannot apply a post processor to CDC event data. If you attempt to specify both, the connector logs a warning.

You can specify a CDC handler on your sink connector with the following configuration option:

change.data.capture.handler=<cdc handler class>

To learn more, see change data capture configuration options.

The sink connector provides CDC handlers for the following CDC event producers:

  • MongoDB

  • Debezium

  • Qlik Replicate

Click the following tabs to learn how to configure CDC handlers for the preceding event producers:

The following properties file configures a sink connector to replicate MongoDB change event documents:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your connection uri>
database=<your database>
collection=<your collection>
topics=<topic containing mongodb change event documents>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

To view the source code for the MongoDB CDC handler, see the MongoDB Kafka Connector source code.

Your sink connector can replicate Debezium CDC events originating from these datastores:

  • MongoDB

  • Postgres

  • MySQL

Click the following tabs to see how to configure the Debezium CDC handler to replicate CDC events from each of the preceding datastores:

The following properties file configures a sink connector to replicate Debezium CDC events corresponding to changes in a MongoDB instance:

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler

Note

If you are using a Debezium CDC version earlier than 2.0, set the value of the change.data.capture.handler property to com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.

To view the source code for the Debezium CDC handler, see the MongoDB Kafka Connector source code.

The following properties file configures a sink connector to replicate Debezium CDC events corresponding to changes in a Postgres instance:

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler

To view the source code for the Debezium CDC handler, see the MongoDB Kafka Connector source code.

The following properties file configures a sink connector to replicate Debezium CDC events corresponding to changes in a MySQL instance:

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler

To view the source code for the Debezium CDC handler, see the MongoDB Kafka Connector source code.

Note

Customize the Debezium CDC Handler

If the Debezium CDC handler is unable to replicate CDC events from your datastore, you can customize the handler by extending the DebeziumCdcHandler class. For more information on custom CDC handlers, see the Create your Own CDC Handler section of this guide.

Your sink connector can replicate Qlik Replicate CDC events originating from these datastores:

  • OracleDB

  • MySQL

  • Postgres

The following properties file configures a sink connector to replicate Qlik Replicate CDC events:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your connection uri>
database=<your database>
collection=<your collection>
topics=<topic containing qlik replicate cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler

To view the source code for the Qlik Replicate CDC handler, see the MongoDB Kafka Connector source code.

Note

Customize the Qlik Replicate CDC Handler

If the Qlik Replicate CDC handler is unable to replicate CDC events from your datastore, you can customize the handler by extending the QlikCdcHandler class. For more information on custom CDC handlers, see the Create your Own CDC Handler section of this guide.

If none of the prebuilt CDC handlers fit your use case, you can create your own. Your custom CDC handler is a Java class that implements the CdcHandler interface.

To learn more, see the source code for the CdcHandler interface.

To view examples of CDC handler implementations, see the source code for the prebuilt CDC handlers.

To configure your sink connector to use your custom CDC Handler, you must perform the following actions:

  1. Compile your custom CDC handler class to a JAR file.

  2. Add the compiled JAR to the classpath/plugin path for your Kafka workers. For more information about plugin paths, see the Confluent documentation.

    Note

    Kafka Connect loads plugins in isolation. When you deploy a custom write strategy, both the connector JAR and the CDC handler JAR should be on the same path. Your paths should resemble the following:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-CDC-handler.jar

    To learn more about Kafka Connect plugins, see this guide from Confluent.

  3. Specify your custom class in the change.data.capture.handler configuration setting.

To learn how to compile a class to a JAR file, see this guide from Oracle.

Back

Error Handling