MongoDB Kafka Sink Connector の利用開始
このチュートリアルに従って、MongoDB Kafka Sink Connector を構成して Apache Kafka トピックからデータを読み取り、MongoDB コレクションに書込む方法を学びます。
MongoDB Kafka Sink Connector を使い始める
チュートリアル設定を完了する
Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
Sink Connector の設定
次のコマンドを使用して、チュートリアル Docker コンテナに対話型の shell セッションを作成します。
docker exec -it mongo1 /bin/bash
次のコマンドを使用して、 simplesink.json
というソース構成ファイルを作成します。
nano simplesink.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial2.pets", "connection.uri": "mongodb://mongo1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial2", "collection": "pets" } }
注意
構成プロパティで強調表示された行は、 Kafka からのデータを変換する方法をコネクタに指示する変換コマンド を指定します。
作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx simplesink.json
注意
cx
コマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status
Sink Connector が正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ...
Kafka トピックへのデータの書込み (write)
同じ shell で、 Kafka トピックにデータを書込む Python スクリプトを作成します。
nano kafkawrite.py
次のコードを ファイルに貼り付け、変更を保存します。
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush()
Python スクリプトを実行します。
python3 kafkawrite.py
MongoDB コレクションのデータの表示
同じ shell で、次のコマンドを実行して、MongoDB shell であるmongosh
を使用して MongoDB に接続します。
mongosh "mongodb://mongo1"
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test>
プロンプトで次のコマンドを入力して、 Tutorial2.pets
MongoDB 名前空間内のすべてのドキュメントを取得します。
use Tutorial2 db.pets.find()
次のドキュメントが結果として返されます。
{ _id: ObjectId("62659..."), name: 'roscoe' }
コマンドexit
を入力して MongoDB shell を終了します。
(任意)Docker コンテナを停止します
このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。
Tip
その他のチュートリアル
MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。
実行する削除タスクに対応するタブを選択します。
次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。
docker-compose -p mongo-kafka down --rmi all
次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。
docker-compose -p mongo-kafka down
コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。
概要
このチュートリアルでは、 Kafka トピックから MongoDB クラスターのコレクションにデータを保存するように Sink Connector を構成しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。