変更データ キャプチャ ハンドラー
Overview
MongoDB Kafka シンク コネクタを使用して、変更データ キャプチャ(CDC)イベントを複製する方法を学びます。CDCは、データストアの変更を CDC イベントのストリームに変換するソフトウェア アーキテクチャです。CDC イベントは、データストアで実行された変更の再現可能な表現を含むメッセージです。データの複製とは、あるデータストアの CDC イベントに含まれる変更を別のデータストアに適用し、両方のデータストアで変更が行われるようにするプロセスです。
CDC ハンドラーを使用して、Apache Kafka トピックに保存されている CDC イベントを MongoDB に複製します。CDC ハンドラーは、特定のCDC イベント プロデューサーからの CDC イベントを MongoDB への書込み (write) 操作に変換するプログラムです。
CDC イベント プロデューサーは、CDC イベントを生成するアプリケーションです。CDC イベント プロデューサーは、データストア、またはデータストアを監視し、データストアの変更に対応する CDC イベントを生成するアプリケーションです。
注意
MongoDB Change Streams は CDC アーキテクチャの例です。 変更ストリームの詳細については、「 MongoDBKafka ConnectorChange Streams」の ガイドを参照してください。
データを複製する方法のチュートリアル デモは、「変更データ キャプチャ ハンドラーによるデータの複製」チュートリアルをご覧ください。
CDC ハンドラーを指定する
次の構成オプションを使用して、シンク コネクタに CDC ハンドラーを指定できます。
change.data.capture.handler=<cdc handler class>
詳細については、「データ キャプチャ設定オプションの変更」を参照してください。
使用可能な CDC ハンドラー
シンク コネクタは、次の CDC イベント プロデューサー用の CDC ハンドラーを提供します。
上記のイベント プロデューサーの CDC ハンドラーを構成する方法については、以下のタブをクリックしてください。
以下のプロパティ ファイルでは、MongoDB 変更イベント ドキュメントを複製するように Sink Connector を構成します。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your connection uri> database=<your database> collection=<your collection> topics=<topic containing mongodb change event documents> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
MongoDB CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。
Sink Connector は、これらのデータストアから発生した Debezium CDC イベントを複製できます。
MongoDB
Postgres
MySQL
次のタブをクリックすると、Debezium CDC ハンドラーを構成して、前述の各データストアから CDC イベントを複製する方法を確認できます。
以下のプロパティ ファイルは、MongoDB インスタンスの変更に対応する Debezium CDC イベントを複製するように Sink Connector を構成します。
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler
注意
2.0より前のバージョンの Debezium CDC を使用している場合は、 change.data.capture.handler
プロパティの値をcom.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler
に設定します。
Debezium CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。
以下のプロパティ ファイルは、Postgres インスタンスの変更に対応する Debezium CDC イベントを複製するように Sink Connector を構成します。
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler
Debezium CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。
以下のプロパティ ファイルは、MySQL インスタンスの変更に対応する Debezium CDC イベントを複製するように Sink Connector を構成します。
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler
Debezium CDC ハンドラーのソースコードを表示するには 、 MongoDB Kafka Connector ソースコード を参照してください。
注意
Debezium CDC ハンドラーのカスタマイズ
Debezium CDCハンドラーがデータストアからCDCイベントを複製できない場合は、DebeziumCdcHandler クラスを拡張してハンドラーをカスタマイズできます。カスタム CDC ハンドラーの詳細については、このガイドの「独自の CDC ハンドラーの作成」セクションを参照してください。
シンク コネクタは、次のデータストアから発生した Qlik Replicate CDC イベントを複製できます。
OracleDB
MySQL
Postgres
以下のプロパティ ファイルでは、Qlik Replicate CDC イベントを複製するように Sink Connector を構成します。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your connection uri> database=<your database> collection=<your collection> topics=<topic containing qlik replicate cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler
Qlik Replicate CDC ハンドラーのソースコードを表示するには、MongoDB Kafka Connector ソースコードを参照してください。
注意
Qlik 複製 CDC ハンドラーのカスタマイズ
Qlik Replicate CDC ハンドラーがデータストアから CDC イベントを複製できない場合は、 QlikCdcHandler を拡張してハンドラーをカスタマイズできます クラス。カスタム CDC ハンドラーの詳細については、このガイドの「独自の CDC ハンドラーの作成」セクションを参照してください。
独自の CDC ハンドラーを作成する
事前に構築された CDC ハンドラーでユースケースに適合するものがない場合は、独自のハンドラーを作成できます。カスタム CDC ハンドラーは、CdcHandler
インターフェイスを実装する Java クラスです。
詳細については 、 CdcHandler インターフェースのソースコード を参照してください。
CDC ハンドラーの実装例については、「事前に構築された CDC ハンドラーのソースコード」を参照してください。
CDC ハンドラーの使用方法
カスタム CDC ハンドラーを使用するようにシンク コネクタを構成するには、次のアクションを実行する必要があります。
カスタム CDC ハンドラー クラスを JAR ファイルにコンパイルします。
コンパイルされた JAR をKafkaワーカーのクラスパス/プラグイン パスに追加します。 プラグイン パスの詳細については、 Confluent のドキュメントを参照してください。
注意
Kafka Connect はプラグインを分離してロードします。カスタム書込みストラテジーを展開する場合、connector JAR と CDC ハンドラー JAR の両方が同じパスにある必要があります。パスは次のようになります。
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-CDC-handler.jar
Kafka Connect プラグインの詳細については、Confluent のこちらのガイドを参照してください。
change.data.capture.handler
構成設定でカスタム クラスを指定します。
クラスを JAR ファイルにコンパイルする方法については、Oracle のこちらのガイドを参照してください。