Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

変更データ キャプチャ ハンドラーによるデータの複製

項目一覧

  • Overview
  • CDC ハンドラーによるデータの複製
  • チュートリアル設定を完了する
  • インタラクティブ shell の開始
  • Source Connector の構成
  • Sink Connector の設定
  • Kafka トピックの監視
  • ソースにデータを書き込み、データフローを監視する
  • (任意)追加変更の生成
  • 概要
  • 詳細

このチュートリアルに従って、変更データ キャプチャ(CDC)ハンドラーを使用して MongoDB Kafka Connector でデータを複製する方法を学びます。 CDC ハンドラーは、CDC イベントを MongoDB への書込み (write) 操作に変換するアプリケーションです。 あるデータストアの変更を別のデータストアに複製する必要がある場合は、 CDC ハンドラーを使用します。

このチュートリアルでは、MongoDB Kafka ソース コネクタと シンク コネクタを構成して実行し、CDC を使用して 2 つの MongoDB コレクションに同じドキュメントを含めます。 Source Connector は元のコレクションの変更ストリーム データを Kafka トピックに書込み、Sink Connector は Kafka トピック データをターゲットの MongoDB コレクションに書込みます。

CDC ハンドラーの動作方法について詳しくは、 変更データ キャプチャ ハンドラーのガイドをご覧ください。

1

Kafka Connector チュートリアル設定の手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。

2

個別の のDocker コンテナ上で 2 つの対話型 shellWindows を起動します。チュートリアルでは、shell を使用してさまざまなタスクを実行、観察できます。

対話型 shell を起動するには、ターミナルから次のコマンドを実行します。

docker exec -it mongo1 /bin/bash

このチュートリアル全体では、このインタラクティブ shell をCDCShell1として参照します。

対話型 shell を起動するには、2 つ目のターミナルで次のコマンドを実行します。

docker exec -it mongo1 /bin/bash

このチュートリアル全体では、このインタラクティブ shell をCDCShell2として参照します。

画面上の 2 つのWindowsを配置して、両方が表示されるようにし、リアルタイム更新を確認します。

CDCShell1を使用して connector を構成し、 Kafka トピックを監視します。 CDCShell2を使用して、MongoDB で書込み (write) 操作を実行します。

3

CDCShell1で、 CDCTutorial.Source MongoDB 名前空間から読み取り、 CDCTutorial.Source Kafka トピックに書込むようにソース コネクタを構成します。

次のコマンドを使用して、 cdc-source.jsonという構成ファイルを作成します。

nano cdc-source.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

作成した構成ファイルを使用してソース コネクタを起動するには、 CDCShell1で次のコマンドを実行します。

cx cdc-source.json

注意

cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

ソース コネクタが正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

CDCShell1で、 CDCTutorial.Source Kafka トピックからCDCTutorial.Destination MongoDB 名前空間にデータをコピーするように Sink Connector を構成します。

次のコマンドを使用して、 cdc-sink.jsonという構成ファイルを作成します。

nano cdc-sink.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。

cx cdc-sink.json

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

Sink Connector が正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

CDCShell1で、受信イベントの Kafka トピックを監視します。 次のコマンドを実行してkafkacatアプリケーションを起動し、トピックに公開されたデータを出力します。

kc CDCTutorial.Source

注意

kcコマンドは、 Kafka に接続し、指定されたトピックの出力をフォーマットするためのオプションを持つkafkacatアプリケーションを呼び出す、チュートリアル開発環境に含まれるカスタム スクリプトです。

起動すると、次の出力が表示されます。これは、現在読み込むデータがないことを示しています。

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

CDCShell2で、次のコマンドを実行して MongoDB shell であるmongoshを使用して MongoDB に接続します。

mongosh "mongodb://mongo1"

正常に接続すると、次の MongoDB shell プロンプトが表示されます。

rs0 [direct: primary] test>

プロンプトで次のコマンドを入力して、新しいドキュメントをCDCTutorial.Source MongoDB 名前空間に挿入します。

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

MongoDB が挿入コマンドを完了すると、次のテキストのような確認応答が返されます。

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

ソース コネクタは変更を取得し、 Kafka トピックに公開します。 CDCShell1ウィンドウに次のトピック メッセージが表示されます。

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

Sink Connector は Kafka メッセージを選択し、データを MongoDB にシンクします。 CDCShell2で起動した MongoDB shell で次のコマンドを実行すると、MongoDB のCDCTutorial.Destination名前空間からドキュメントを検索できます。

db.Destination.find()

次のドキュメントが結果として返されます。

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

MongoDB shell から次のコマンドを実行して、 CDCTutorial.Source名前空間からドキュメントを削除してみてください。

db.Source.deleteMany({})

CDCShell1ウィンドウに次のトピック メッセージが表示されます。

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

コレクション内の現在のドキュメント数を取得するには、次のコマンドを実行します。

db.Destination.count()

これにより、コレクションが空であることを示す次の出力が返されます。

0

次のコマンドを実行して、MongoDB shell を終了します。

exit

このチュートリアルでは、ソース コネクタを設定して、MongoDB コレクションに対する変更をキャプチャし、Apache Kafka に送信します。 また、MongoDB CDC ハンドラーで Sink Connector を構成し、Apache Kafka から MongoDB コレクションにデータを移動しました。

このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。

戻る

MongoDB Kafka Sink Connector の利用開始