変更データ キャプチャ ハンドラーによるデータの複製
項目一覧
Overview
このチュートリアルに従って、変更データ キャプチャ(CDC)ハンドラーを使用して MongoDB Kafka Connector でデータを複製する方法を学びます。 CDC ハンドラーは、CDC イベントを MongoDB への書込み (write) 操作に変換するアプリケーションです。 あるデータストアの変更を別のデータストアに複製する必要がある場合は、 CDC ハンドラーを使用します。
このチュートリアルでは、MongoDB Kafka ソース コネクタと シンク コネクタを構成して実行し、CDC を使用して 2 つの MongoDB コレクションに同じドキュメントを含めます。 Source Connector は元のコレクションの変更ストリーム データを Kafka トピックに書込み、Sink Connector は Kafka トピック データをターゲットの MongoDB コレクションに書込みます。
CDC ハンドラーの動作方法について詳しくは、 変更データ キャプチャ ハンドラーのガイドをご覧ください。
CDC ハンドラーによるデータの複製
チュートリアル設定を完了する
Kafka Connector チュートリアル設定の手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
インタラクティブ shell の開始
個別の のDocker コンテナ上で 2 つの対話型 shellWindows を起動します。チュートリアルでは、shell を使用してさまざまなタスクを実行、観察できます。
対話型 shell を起動するには、ターミナルから次のコマンドを実行します。
docker exec -it mongo1 /bin/bash
このチュートリアル全体では、このインタラクティブ shell をCDCShell1として参照します。
対話型 shell を起動するには、2 つ目のターミナルで次のコマンドを実行します。
docker exec -it mongo1 /bin/bash
このチュートリアル全体では、このインタラクティブ shell をCDCShell2として参照します。
画面上の 2 つのWindowsを配置して、両方が表示されるようにし、リアルタイム更新を確認します。
CDCShell1を使用して connector を構成し、 Kafka トピックを監視します。 CDCShell2を使用して、MongoDB で書込み (write) 操作を実行します。
Source Connector の構成
CDCShell1で、 CDCTutorial.Source
MongoDB 名前空間から読み取り、 CDCTutorial.Source
Kafka トピックに書込むようにソース コネクタを構成します。
次のコマンドを使用して、 cdc-source.json
という構成ファイルを作成します。
nano cdc-source.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-cdc-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Source" } }
作成した構成ファイルを使用してソース コネクタを起動するには、 CDCShell1で次のコマンドを実行します。
cx cdc-source.json
注意
cx
コマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status
ソース コネクタが正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...
Sink Connector の設定
CDCShell1で、 CDCTutorial.Source
Kafka トピックからCDCTutorial.Destination
MongoDB 名前空間にデータをコピーするように Sink Connector を構成します。
次のコマンドを使用して、 cdc-sink.json
という構成ファイルを作成します。
nano cdc-sink.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-cdc-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Destination" } }
作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx cdc-sink.json
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status
Sink Connector が正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ...
Kafka トピックの監視
CDCShell1で、受信イベントの Kafka トピックを監視します。 次のコマンドを実行してkafkacat
アプリケーションを起動し、トピックに公開されたデータを出力します。
kc CDCTutorial.Source
注意
kc
コマンドは、 Kafka に接続し、指定されたトピックの出力をフォーマットするためのオプションを持つkafkacat
アプリケーションを呼び出す、チュートリアル開発環境に含まれるカスタム スクリプトです。
起動すると、次の出力が表示されます。これは、現在読み込むデータがないことを示しています。
% Reached end of topic CDCTutorial.Source [0] at offset 0
ソースにデータを書き込み、データフローを監視する
CDCShell2で、次のコマンドを実行して MongoDB shell であるmongosh
を使用して MongoDB に接続します。
mongosh "mongodb://mongo1"
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test>
プロンプトで次のコマンドを入力して、新しいドキュメントをCDCTutorial.Source
MongoDB 名前空間に挿入します。
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });
MongoDB が挿入コマンドを完了すると、次のテキストのような確認応答が返されます。
{ acknowledged: true, insertedId: ObjectId("600b38ad...") }
ソース コネクタは変更を取得し、 Kafka トピックに公開します。 CDCShell1ウィンドウに次のトピック メッセージが表示されます。
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8260..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } }, "wallTime": { "$date": "..." }, "fullDocument": { "_id": { "$oid": "600b38ad..." }, "proclaim": "Hello World!" }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38a..." } } } }
Sink Connector は Kafka メッセージを選択し、データを MongoDB にシンクします。 CDCShell2で起動した MongoDB shell で次のコマンドを実行すると、MongoDB のCDCTutorial.Destination
名前空間からドキュメントを検索できます。
db.Destination.find()
次のドキュメントが結果として返されます。
[ { _id: ObjectId("600b38a..."), proclaim: 'Hello World' } ]
(任意)追加変更の生成
MongoDB shell から次のコマンドを実行して、 CDCTutorial.Source
名前空間からドキュメントを削除してみてください。
db.Source.deleteMany({})
CDCShell1ウィンドウに次のトピック メッセージが表示されます。
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8261...." }, ... "operationType": "delete", "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "6138..." } } } }
コレクション内の現在のドキュメント数を取得するには、次のコマンドを実行します。
db.Destination.count()
これにより、コレクションが空であることを示す次の出力が返されます。
0
次のコマンドを実行して、MongoDB shell を終了します。
exit
概要
このチュートリアルでは、ソース コネクタを設定して、MongoDB コレクションに対する変更をキャプチャし、Apache Kafka に送信します。 また、MongoDB CDC ハンドラーで Sink Connector を構成し、Apache Kafka から MongoDB コレクションにデータを移動しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。