Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Kafka Source Connector の利用開始

項目一覧

  • MongoDB Kafka Source Connector を使い始める
  • 概要
  • 詳細

このチュートリアルに従って、MongoDB Kafka ソース コネクタを構成して変更ストリームからデータを読み取り、Apache Kafka トピックに公開する方法を学びます。

1

Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。

2

次のコマンドを使用して、チュートリアル設定用にダウンロードしたチュートリアル Docker コンテナに対話型の shell セッションを作成します。

docker exec -it mongo1 /bin/bash

次のコマンドを使用して、 simplesource.jsonというソース構成ファイルを作成します。

nano simplesource.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "Tutorial1",
"collection": "orders"
}
}

作成した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。

cx simplesource.json

注意

cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

ソース コネクタが正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
...
3

同じ shell で、次のコマンドを実行して、MongoDB shell であるmongoshを使用して MongoDB に接続します。

mongosh "mongodb://mongo1"

正常に接続すると、次の MongoDB shell プロンプトが表示されます。

rs0 [direct: primary] test>

プロンプトで、次のコマンドを入力して新しいドキュメントを挿入します。

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

MongoDB が挿入コマンドを完了すると、次のテキストのような確認応答が返されます。

{
acknowledged: true,
insertedId: ObjectId("627e7e...")
}

コマンドexitを入力して MongoDB shell を終了します。

次のコマンドを使用して、 Kafka 環境のステータスを確認します。

status

上記のコマンドの出力には、ソース コネクタが変更イベントを受信した後に作成した新しいトピックが表示されています。

...
"topic": "Tutorial1.orders",
...

次のコマンドを実行して、新しい Kafka トピックのデータのコンテンツを確認します。

kc Tutorial1.orders

注意

kcコマンドは、 Kafka トピックの内容を出力するヘルパー スクリプトです。

上記のコマンドを実行すると、「キー」セクションと「値」セクションで整理された次の Kafka トピック データが表示されます。

出力の「値」セクションから、次の形式の JSON ドキュメントでハイライトされているように、 fullDocumentデータを含むpayloadの部分を見つけることができます。

{
"_id": {
"_data": "8262655A..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650809557,
"i": 2
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "62655a..."
},
"order_id": 1,
"item": "coffee"
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62655a..."
}
}
}
4

変更ストリームによって作成されるイベントのメタデータを省略するには、 fullDocumentフィールドのみを返すようにします。

次のコマンドを使用して connector を停止します。

del mongo-simple-source

注意

delコマンドは、 Kafka Connect REST API を呼び出してコネクタを停止するヘルパー スクリプトであり、次のコマンドと同等です。

curl -X DELETE connect:8083/connectors/<parameter>

次のコマンドを使用して、 simplesource.jsonというソース構成ファイルを編集します。

nano simplesource.json

既存の構成を削除し、次の構成を追加して、ファイルを保存します。

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"publish.full.document.only": true,
"database": "Tutorial1",
"collection": "orders"
}
}

更新した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。

cx simplesource.json

次のコマンドを使用して、 mongoshを使用して MongoDB に接続します。

mongosh "mongodb://mongo1"

プロンプトで、次のコマンドを入力して新しいドキュメントを挿入します。

use Tutorial1
db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )

次のコマンドを実行してmongoshを終了します。

exit

次のコマンドを実行して、新しい Kafka トピックのデータのコンテンツを確認します。

kc Tutorial1.orders

「Value」ドキュメントのpayloadフィールドには、次のドキュメント データのみを含める必要があります。

{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
5

このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。

Tip

その他のチュートリアル

MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。

実行する削除タスクに対応するタブを選択します。

次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。

docker-compose -p mongo-kafka down --rmi all

次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。

docker-compose -p mongo-kafka down

コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。

このチュートリアルでは、 Kafka トピックに公開された変更ストリーム イベント データを変更するためのさまざまな構成を使用してソース コネクタを起動しました。

このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。

  • Source Connector 構成プロパティ

  • Kafka Connect REST API

戻る

MongoDB Change Streams を探索