Kafka Connectorクイック スタート
項目一覧
Overview
このガイドでは、MongoDB Kafka Connector を構成して、MongoDB と Apache Kafka 間でデータを送信する方法について説明します。
このガイドを完了すると、 Kafka Connect REST APIを使用してMongoDB Kafka Connector を構成して、 MongoDBからデータを読み取り、 Kafkaトピックに書込み、またKafkaトピックからデータを読み取り、 MongoDBに書込む方法を理解する必要があります。
このガイドの手順を完了するには、 サンプル データ パイプライン の構築に必要なサービスを含むコンテナ化された開発環境である サンドボックス をダウンロードして操作する必要があります。
サンドボックスとサンプル データ パイプラインを設定するには、次のセクションをお読みください。
注意
このガイドを完了したら、 サンドボックスの削除セクションの手順に従って環境を削除できます。
必要なパッケージをインストールします
次のパッケージをダウンロードしてインストールします。
サンドボックスは、便宜上と一貫性のために Docker を使用します。 Apache Kafka の配置オプションの詳細については、次のリソースを参照してください。
サンドボックスをダウンロード
このチュートリアルでは、サンプル データ パイプラインを構築するために必要なサービスを含むサンドボックスを作成しました。
サンドボックスをダウンロードするには、チュートリアル リポジトリを開発環境にクローンします。 次に、クイックスタート チュートリアルに対応するディレクトリに移動します。 bash または同様の shell を使用する場合は、次のコマンドを使用します。
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/
サンドボックスを起動する
サンドボックスは、Docker コンテナで次のサービスを開始します。
レプリカセットとして構成された MongoDB
Apache Kafka
インストールされた MongoDB Kafka Connector を使用した Kafka Connect
Apache Kafka の構成を管理する Apache Zooker
サンドボックスを起動するには、チュートリアル ディレクトリから次のコマンドを実行します。
docker compose -p mongo-kafka up -d --force-recreate
サンドボックスを起動すると、Docker は実行に必要なイメージをすべてダウンロードします。
注意
ダウンロードにかかる時間は?
このチュートリアルの Docker イメージには合計で約 2.4 GB のスペースが必要です。 次のリストは、さまざまなインターネット速度でイメージをダウンロードするのにかかる時間を示しています。
40メガビット/秒: 8分
20メガビット/秒: 16分
10メガビット/秒: 32 分
Docker がイメージをダウンロードしてビルドすると、開発環境に次の出力が表示されます。
... Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating connect ... done Creating rest-proxy ... done Creating mongo1 ... done Creating mongo1-setup ... done
注意
ポート マッピング
サンドボックスは、次のサービスをホスト マシンのポートにマッピングします。
サンドボックス MongoDB サーバーはホストマシンのポート
35001
にマッピングされますサンドボックス Kafka Connect JTX サーバーはホストマシンのポート
35000
にマッピングされます
サンドボックスを起動するには、これらのポートが無料である必要があります。
コネクターの追加
サンプル データ パイプラインを完了するには、 Kafka Connect と MongoDB 間でデータを転送するためのコネクターを Kafka Connect に追加する必要があります。 MongoDB から Apache Kafka にデータを転送するためのソース コネクタを追加します。 Apache Kafka から MongoDB にデータを転送するためのSink Connectorを追加します。
サンドボックスにコネクターを追加するには、まず次のコマンドを使用して コンテナ内でインタラクティブshell Dockerを起動します。
docker exec -it mongo1 /bin/bash
shell セッションを開始すると、次のプロンプトが表示されます。
MongoDB Kafka Connector Sandbox $
ソースConnectorの追加
Docker コンテナの shell を使用して、 Kafka Connect REST API を使用してソース コネクタを追加します。
次の API リクエストは、次のプロパティで構成されたソース コネクタを追加します。
Kafka Connect がコネクタをインスタンス化するために使用する クラス
コネクタがデータを読み取る MongoDB レプリカセットの接続 URI、データベース、コレクション
コネクタが MongoDB から読み取る挿入ドキュメントに値
"MongoDB Kafka Connector"
を持つフィールドtravel
を追加する集計パイプライン
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://connect:8083/connectors -w "\n"
注意
「接続に失敗しました」というメッセージが表示されるのはなぜですか?
Kafka Connect REST API が起動するまでに最大 3 分かかります。 次のエラーが表示された場合は、3 分待ってから前のコマンドを再度実行してください。
... curl: (7) Failed to connect to connect port 8083: Connection refused
ソース コネクタを追加したことを確認するには、次のコマンドを実行します。
curl -X GET http://connect:8083/connectors
上記のコマンドは実行中の connector の名前を出力します。
["mongo-source"]
ソース コネクタ プロパティの詳細については、 ソースConnector構成プロパティ のページを参照してください。
集計パイプラインの詳細については、「 集計パイプライン 」に関する MongoDB マニュアルのページを参照してください。
Sink Connector の追加
Docker コンテナの shell を使用して、 Kafka Connect REST API を使用して Sink Connector を追加します。
次の API リクエストは、次のプロパティで構成された Sink Connector を追加します。
Kafka Connect がコネクタをインスタンス化するために使用する クラス
コネクタがデータを書き込む MongoDB レプリカセットの接続 URI、データベース、コレクション
コネクタがデータを読み取る Apache Kafka トピック
MongoDB 変更イベント ドキュメントの 変更データ キャプチャ ハンドラー
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://connect:8083/connectors -w "\n"
source コネクタと Sink Connector の両方を追加したことを確認するには、次のコマンドを実行します。
curl -X GET http://connect:8083/connectors
上記のコマンドは実行中の connector の名前を出力します。
["mongo-source", "mongo-sink"]
Sink Connector プロパティの詳細については、 Sink Connector 構成プロパティ のページを参照してください。
変更データ キャプチャ イベントの詳細については、「変更データ キャプチャ ハンドラー」のガイドを参照してください。
コネクターを介してドキュメントの内容を送信
コネクタを介してドキュメントの内容を送信するには、ソース コネクタがデータを読み取る MongoDB コレクションにドキュメントを挿入します。
コレクションに新しいドキュメントを挿入するには、次のコマンドを使用して Docker コンテナの shell から MongoDB shell を入力します。
mongosh mongodb://mongo1:27017/?replicaSet=rs0
上記のコマンドを実行すると、次のプロンプトが表示されます。
rs0 [primary] test>
MongoDB shellから、次のコマンドを使用して、 quickstart
データベースのsampleData
コレクションにドキュメントを挿入します。
use quickstart db.sampleData.insertOne({"hello":"world"})
sampleData
コレクションにドキュメントを挿入したら、コネクタが変更を処理していることを確認します。 次のコマンドを使用して、 topicData
コレクションの内容を確認します。
db.topicData.find()
次のような出力が表示されます。
[ { _id: ObjectId(...), hello: 'world', travel: 'MongoDB Kafka Connector' } ]
次のコマンドを使用して MongoDB shell を終了します。
exit
サンドボックスを削除する
開発環境のリソースを節約するには、サンドボックスを削除します。
サンドボックスを削除する前に、次のコマンドを実行して Docker コンテナ内の shell セッションを終了します。
exit
Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB のサンドボックスを再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。
実行する削除タスクに対応するタブを選択します。
次の shell コマンドを実行して、Docker コンテナとイメージをサンドボックスから削除します。
docker-compose -p mongo-kafka down --rmi all
次の shell コマンドを実行して、Docker コンテナを削除しますが、サンドボックスのイメージは保持します。
docker-compose -p mongo-kafka down
次のステップ
MongoDB Kafka Connector をインストールする方法については、「 MongoDB Kafka Connector のインストール」のガイドを参照してください。
Apache Kafka から MongoDB にデータを処理して移動する方法の詳細については、 Sink Connectorのガイド を参照してください。
MongoDBからApache Kafkaにデータを処理して移動する方法の詳細については、 「ソースConnector 」 のガイドを参照してください。