Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Kafka Sink Connector の利用開始

項目一覧

  • MongoDB Kafka Sink Connector を使い始める
  • 概要
  • 詳細

このチュートリアルに従って、MongoDB Kafka Sink Connector を構成して Apache Kafka トピックからデータを読み取り、MongoDB コレクションに書込む方法を学びます。

1

Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。

2

次のコマンドを使用して、チュートリアル Docker コンテナに対話型の shell セッションを作成します。

docker exec -it mongo1 /bin/bash

次のコマンドを使用して、 simplesink.jsonというソース構成ファイルを作成します。

nano simplesink.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-tutorial-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "Tutorial2.pets",
"connection.uri": "mongodb://mongo1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets"
}
}

注意

構成プロパティで強調表示された行は、 Kafka からのデータを変換する方法をコネクタに指示する変換コマンド を指定します。

作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。

cx simplesink.json

注意

cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。

curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

Sink Connector が正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
Currently configured connectors
[
"mongo-tutorial-sink"
]
...
3

同じ shell で、 Kafka トピックにデータを書込む Python スクリプトを作成します。

nano kafkawrite.py

次のコードを ファイルに貼り付け、変更を保存します。

from kafka import KafkaProducer
import json
from json import dumps
p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8'))
data = {'name': 'roscoe'}
p.send('Tutorial2.pets', value = data)
p.flush()

Python スクリプトを実行します。

python3 kafkawrite.py
4

同じ shell で、次のコマンドを実行して、MongoDB shell であるmongoshを使用して MongoDB に接続します。

mongosh "mongodb://mongo1"

正常に接続すると、次の MongoDB shell プロンプトが表示されます。

rs0 [direct: primary] test>

プロンプトで次のコマンドを入力して、 Tutorial2.pets MongoDB 名前空間内のすべてのドキュメントを取得します。

use Tutorial2
db.pets.find()

次のドキュメントが結果として返されます。

{ _id: ObjectId("62659..."), name: 'roscoe' }

コマンドexitを入力して MongoDB shell を終了します。

5

このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。

Tip

その他のチュートリアル

MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。

実行する削除タスクに対応するタブを選択します。

次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。

docker-compose -p mongo-kafka down --rmi all

次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。

docker-compose -p mongo-kafka down

コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。

このチュートリアルでは、 Kafka トピックから MongoDB クラスターのコレクションにデータを保存するように Sink Connector を構成しました。

このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。

  • Sink Connector 構成プロパティ

  • Kafka Connector 変換の概要

  • Kafka Connect REST API

戻る

MongoDB Kafka Source Connector の利用開始