MongoDB Kafka Source Connector の利用開始
このチュートリアルに従って、MongoDB Kafka ソース コネクタを構成して変更ストリームからデータを読み取り、Apache Kafka トピックに公開する方法を学びます。
MongoDB Kafka Source Connector を使い始める
チュートリアル設定を完了する
Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
Source Connector の構成
次のコマンドを使用して、チュートリアル設定用にダウンロードしたチュートリアル Docker コンテナに対話型の shell セッションを作成します。
docker exec -it mongo1 /bin/bash
次のコマンドを使用して、 simplesource.json
というソース構成ファイルを作成します。
nano simplesource.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
作成した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。
cx simplesource.json
注意
cx
コマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status
ソース コネクタが正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
変更イベントの作成
同じ shell で、次のコマンドを実行して、MongoDB shell であるmongosh
を使用して MongoDB に接続します。
mongosh "mongodb://mongo1"
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test>
プロンプトで、次のコマンドを入力して新しいドキュメントを挿入します。
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
MongoDB が挿入コマンドを完了すると、次のテキストのような確認応答が返されます。
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
コマンドexit
を入力して MongoDB shell を終了します。
次のコマンドを使用して、 Kafka 環境のステータスを確認します。
status
上記のコマンドの出力には、ソース コネクタが変更イベントを受信した後に作成した新しいトピックが表示されています。
... "topic": "Tutorial1.orders", ...
次のコマンドを実行して、新しい Kafka トピックのデータのコンテンツを確認します。
kc Tutorial1.orders
注意
kc
コマンドは、 Kafka トピックの内容を出力するヘルパー スクリプトです。
上記のコマンドを実行すると、「キー」セクションと「値」セクションで整理された次の Kafka トピック データが表示されます。
出力の「値」セクションから、次の形式の JSON ドキュメントでハイライトされているように、 fullDocument
データを含むpayload
の部分を見つけることができます。
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
変更ストリームの再構成
変更ストリームによって作成されるイベントのメタデータを省略するには、 fullDocument
フィールドのみを返すようにします。
次のコマンドを使用して connector を停止します。
del mongo-simple-source
注意
del
コマンドは、 Kafka Connect REST API を呼び出してコネクタを停止するヘルパー スクリプトであり、次のコマンドと同等です。
curl -X DELETE connect:8083/connectors/<parameter>
次のコマンドを使用して、 simplesource.json
というソース構成ファイルを編集します。
nano simplesource.json
既存の構成を削除し、次の構成を追加して、ファイルを保存します。
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
更新した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。
cx simplesource.json
次のコマンドを使用して、 mongosh
を使用して MongoDB に接続します。
mongosh "mongodb://mongo1"
プロンプトで、次のコマンドを入力して新しいドキュメントを挿入します。
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
次のコマンドを実行してmongosh
を終了します。
exit
次のコマンドを実行して、新しい Kafka トピックのデータのコンテンツを確認します。
kc Tutorial1.orders
「Value」ドキュメントのpayload
フィールドには、次のドキュメント データのみを含める必要があります。
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(任意)Docker コンテナを停止します
このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。
Tip
その他のチュートリアル
MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。
実行する削除タスクに対応するタブを選択します。
次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。
docker-compose -p mongo-kafka down --rmi all
次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。
docker-compose -p mongo-kafka down
コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。
概要
このチュートリアルでは、 Kafka トピックに公開された変更ストリーム イベント データを変更するためのさまざまな構成を使用してソース コネクタを起動しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。