Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

変更データ キャプチャ ハンドラー

項目一覧

  • Overview
  • CDC ハンドラーを指定する
  • 使用可能な CDC ハンドラー
  • 独自の CDC ハンドラーを作成する
  • CDC ハンドラーの使用方法

MongoDB Kafka シンク コネクタを使用して、変更データ キャプチャ(CDC)イベントを複製する方法を学びます。CDCは、データストアの変更を CDC イベントのストリームに変換するソフトウェア アーキテクチャです。CDC イベントは、データストアで実行された変更の再現可能な表現を含むメッセージです。データの複製とは、あるデータストアの CDC イベントに含まれる変更を別のデータストアに適用し、両方のデータストアで変更が行われるようにするプロセスです。

CDC ハンドラーを使用して、Apache Kafka トピックに保存されている CDC イベントを MongoDB に複製します。CDC ハンドラーは、特定のCDC イベント プロデューサーからの CDC イベントを MongoDB への書込み (write) 操作に変換するプログラムです。

CDC イベント プロデューサーは、CDC イベントを生成するアプリケーションです。CDC イベント プロデューサーは、データストア、またはデータストアを監視し、データストアの変更に対応する CDC イベントを生成するアプリケーションです。

注意

MongoDB Change Streams は CDC アーキテクチャの例です。 変更ストリームの詳細については、「 MongoDBKafka ConnectorChange Streams」の ガイドを参照してください。

データを複製する方法のチュートリアル デモは、「変更データ キャプチャ ハンドラーによるデータの複製」チュートリアルをご覧ください。

重要

CDC およびポストプロセッサ

CDC イベント データにポストプロセッサを適用することはできません。両方を指定しようとすると、コネクタは警告をログに記録します。

次の構成オプションを使用して、シンク コネクタに CDC ハンドラーを指定できます。

change.data.capture.handler=<cdc handler class>

詳細については、「データ キャプチャ設定オプションの変更」を参照してください。

シンク コネクタは、次の CDC イベント プロデューサー用の CDC ハンドラーを提供します。

  • MongoDB

  • Debezium

  • Qlik の複製

上記のイベント プロデューサーの CDC ハンドラーを構成する方法については、以下のタブをクリックしてください。

以下のプロパティ ファイルでは、MongoDB 変更イベント ドキュメントを複製するように Sink Connector を構成します。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your connection uri>
database=<your database>
collection=<your collection>
topics=<topic containing mongodb change event documents>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

MongoDB CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。

Sink Connector は、これらのデータストアから発生した Debezium CDC イベントを複製できます。

  • MongoDB

  • Postgres

  • MySQL

次のタブをクリックすると、Debezium CDC ハンドラーを構成して、前述の各データストアから CDC イベントを複製する方法を確認できます。

以下のプロパティ ファイルは、MongoDB インスタンスの変更に対応する Debezium CDC イベントを複製するように Sink Connector を構成します。

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler

注意

2.0より前のバージョンの Debezium CDC を使用している場合は、 change.data.capture.handlerプロパティの値をcom.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandlerに設定します。

Debezium CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。

以下のプロパティ ファイルは、Postgres インスタンスの変更に対応する Debezium CDC イベントを複製するように Sink Connector を構成します。

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler

Debezium CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。

以下のプロパティ ファイルは、MySQL インスタンスの変更に対応する Debezium CDC イベントを複製するように Sink Connector を構成します。

connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector
connection.uri=<your connection uri>
database=<your mongodb database>
collection=<your mongodb collection>
topics=<topic containing debezium cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler

Debezium CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。

注意

Debezium CDC ハンドラーのカスタマイズ

Debezium CDCハンドラーがデータストアからCDCイベントを複製できない場合は、DebeziumCdcHandler クラスを拡張してハンドラーをカスタマイズできます。カスタム CDC ハンドラーの詳細については、このガイドの「独自の CDC ハンドラーの作成」セクションを参照してください。

シンク コネクタは、次のデータストアから発生した Qlik Replicate CDC イベントを複製できます。

  • OracleDB

  • MySQL

  • Postgres

以下のプロパティ ファイルでは、Qlik Replicate CDC イベントを複製するように Sink Connector を構成します。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your connection uri>
database=<your database>
collection=<your collection>
topics=<topic containing qlik replicate cdc events>
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler

Qlik Replicate CDC ハンドラーのソースコードを表示するには、MongoDB Kafka Connector ソースコードを参照してください。

注意

Qlik 複製 CDC ハンドラーのカスタマイズ

Qlik Replicate CDC ハンドラーがデータストアから CDC イベントを複製できない場合は、 QlikCdcHandler を拡張してハンドラーをカスタマイズできます クラス。カスタム CDC ハンドラーの詳細については、このガイドの「独自の CDC ハンドラーの作成」セクションを参照してください。

事前に構築された CDC ハンドラーでユースケースに適合するものがない場合は、独自のハンドラーを作成できます。カスタム CDC ハンドラーは、CdcHandler インターフェイスを実装する Java クラスです。

詳細については 、 CdcHandler インターフェースのソースコード を参照してください。

CDC ハンドラーの実装例については、「事前に構築された CDC ハンドラーのソースコード」を参照してください。

カスタム CDC ハンドラーを使用するようにシンク コネクタを構成するには、次のアクションを実行する必要があります。

  1. カスタム CDC ハンドラー クラスを JAR ファイルにコンパイルします。

  2. コンパイルされた JAR をKafkaワーカーのクラスパス/プラグイン パスに追加します。 プラグイン パスの詳細については、 Confluent のドキュメントを参照してください。

    注意

    Kafka Connect はプラグインを分離してロードします。カスタム書込みストラテジーを展開する場合、connector JAR と CDC ハンドラー JAR の両方が同じパスにある必要があります。パスは次のようになります。

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-CDC-handler.jar

    Kafka Connect プラグインの詳細については、Confluent のこちらのガイドを参照してください。

  3. change.data.capture.handler 構成設定でカスタム クラスを指定します。

クラスを JAR ファイルにコンパイルする方法については、Oracle のこちらのガイドを参照してください。

戻る

Error Handling