Docs Menu
Docs Home
/
MongoDB Kafka Connector

Kafka Connectorクイック スタート

項目一覧

  • Overview
  • 必要なパッケージをインストールします
  • サンドボックスをダウンロード
  • サンドボックスを起動する
  • コネクターの追加
  • ソースConnectorの追加
  • Sink Connector の追加
  • コネクターを介してドキュメントの内容を送信
  • サンドボックスを削除する
  • 次のステップ

このガイドでは、MongoDB Kafka Connector を構成して、MongoDB と Apache Kafka 間でデータを送信する方法について説明します。

このガイドを完了すると、 Kafka Connect REST APIを使用してMongoDB Kafka Connector を構成して、 MongoDBからデータを読み取り、 Kafkaトピックに書込み、またKafkaトピックからデータを読み取り、 MongoDBに書込む方法を理解する必要があります。

このガイドの手順を完了するには、 サンプル データ パイプライン の構築に必要なサービスを含むコンテナ化された開発環境である サンドボックス をダウンロードして操作する必要があります。

サンドボックスとサンプル データ パイプラインを設定するには、次のセクションをお読みください。

注意

このガイドを完了したら、 サンドボックスの削除セクションの手順に従って環境を削除できます。

次のパッケージをダウンロードしてインストールします。

Tip

Docker のドキュメントを読む

このガイドでは、次の Docker 固有の用語を使用します。

Docker 公式の 「はじめに ガイド」から Docker の詳細を学びます。

サンドボックスは、便宜上と一貫性のために Docker を使用します。 Apache Kafka の配置オプションの詳細については、次のリソースを参照してください。

このチュートリアルでは、サンプル データ パイプラインを構築するために必要なサービスを含むサンドボックスを作成しました。

サンドボックスをダウンロードするには、チュートリアル リポジトリを開発環境にクローンします。 次に、クイックスタート チュートリアルに対応するディレクトリに移動します。 bash または同様の shell を使用する場合は、次のコマンドを使用します。

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

サンドボックスは、Docker コンテナで次のサービスを開始します。

  • レプリカセットとして構成された MongoDB

  • Apache Kafka

  • インストールされた MongoDB Kafka Connector を使用した Kafka Connect

  • Apache Kafka の構成を管理する Apache Zooker

サンドボックスを起動するには、チュートリアル ディレクトリから次のコマンドを実行します。

docker compose -p mongo-kafka up -d --force-recreate

サンドボックスを起動すると、Docker は実行に必要なイメージをすべてダウンロードします。

注意

ダウンロードにかかる時間は?

このチュートリアルの Docker イメージには合計で約 2.4 GB のスペースが必要です。 次のリストは、さまざまなインターネット速度でイメージをダウンロードするのにかかる時間を示しています。

  • 40メガビット/秒: 8分

  • 20メガビット/秒: 16分

  • 10メガビット/秒: 32 分

Docker がイメージをダウンロードしてビルドすると、開発環境に次の出力が表示されます。

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

注意

ポート マッピング

サンドボックスは、次のサービスをホスト マシンのポートにマッピングします。

  • サンドボックス MongoDB サーバーはホストマシンのポート 35001にマッピングされます

  • サンドボックス Kafka Connect JTX サーバーはホストマシンのポート35000にマッピングされます

サンドボックスを起動するには、これらのポートが無料である必要があります。

サンプル データ パイプラインを完了するには、 Kafka Connect と MongoDB 間でデータを転送するためのコネクターを Kafka Connect に追加する必要があります。 MongoDB から Apache Kafka にデータを転送するためのソース コネクタを追加します。 Apache Kafka から MongoDB にデータを転送するためのSink Connectorを追加します。

サンドボックスにコネクターを追加するには、まず次のコマンドを使用して コンテナ内でインタラクティブshell Dockerを起動します。

docker exec -it mongo1 /bin/bash

shell セッションを開始すると、次のプロンプトが表示されます。

MongoDB Kafka Connector Sandbox $

Docker コンテナの shell を使用して、 Kafka Connect REST API を使用してソース コネクタを追加します。

次の API リクエストは、次のプロパティで構成されたソース コネクタを追加します。

  • Kafka Connect がコネクタをインスタンス化するために使用する クラス

  • コネクタがデータを読み取る MongoDB レプリカセットの接続 URI、データベース、コレクション

  • コネクタが MongoDB から読み取る挿入ドキュメントに値"MongoDB Kafka Connector"を持つフィールドtravelを追加する集計パイプライン

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

注意

「接続に失敗しました」というメッセージが表示されるのはなぜですか?

Kafka Connect REST API が起動するまでに最大 3 分かかります。 次のエラーが表示された場合は、3 分待ってから前のコマンドを再度実行してください。

...
curl: (7) Failed to connect to connect port 8083: Connection refused

ソース コネクタを追加したことを確認するには、次のコマンドを実行します。

curl -X GET http://connect:8083/connectors

上記のコマンドは実行中の connector の名前を出力します。

["mongo-source"]

ソース コネクタ プロパティの詳細については、 ソースConnector構成プロパティ のページを参照してください。

集計パイプラインの詳細については、「 集計パイプライン 」に関する MongoDB マニュアルのページを参照してください。

Docker コンテナの shell を使用して、 Kafka Connect REST API を使用して Sink Connector を追加します。

次の API リクエストは、次のプロパティで構成された Sink Connector を追加します。

  • Kafka Connect がコネクタをインスタンス化するために使用する クラス

  • コネクタがデータを書き込む MongoDB レプリカセットの接続 URI、データベース、コレクション

  • コネクタがデータを読み取る Apache Kafka トピック

  • MongoDB 変更イベント ドキュメントの 変更データ キャプチャ ハンドラー

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

source コネクタと Sink Connector の両方を追加したことを確認するには、次のコマンドを実行します。

curl -X GET http://connect:8083/connectors

上記のコマンドは実行中の connector の名前を出力します。

["mongo-source", "mongo-sink"]

Sink Connector プロパティの詳細については、 Sink Connector 構成プロパティ のページを参照してください。

変更データ キャプチャ イベントの詳細については、「変更データ キャプチャ ハンドラー」のガイドを参照してください。

コネクタを介してドキュメントの内容を送信するには、ソース コネクタがデータを読み取る MongoDB コレクションにドキュメントを挿入します。

コレクションに新しいドキュメントを挿入するには、次のコマンドを使用して Docker コンテナの shell から MongoDB shell を入力します。

mongosh mongodb://mongo1:27017/?replicaSet=rs0

上記のコマンドを実行すると、次のプロンプトが表示されます。

rs0 [primary] test>

MongoDB shellから、次のコマンドを使用して、 quickstartデータベースのsampleDataコレクションにドキュメントを挿入します。

use quickstart
db.sampleData.insertOne({"hello":"world"})

sampleDataコレクションにドキュメントを挿入したら、コネクタが変更を処理していることを確認します。 次のコマンドを使用して、 topicDataコレクションの内容を確認します。

db.topicData.find()

次のような出力が表示されます。

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

次のコマンドを使用して MongoDB shell を終了します。

exit

開発環境のリソースを節約するには、サンドボックスを削除します。

サンドボックスを削除する前に、次のコマンドを実行して Docker コンテナ内の shell セッションを終了します。

exit

Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB のサンドボックスを再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。

実行する削除タスクに対応するタブを選択します。

次の shell コマンドを実行して、Docker コンテナとイメージをサンドボックスから削除します。

docker-compose -p mongo-kafka down --rmi all

次の shell コマンドを実行して、Docker コンテナを削除しますが、サンドボックスのイメージは保持します。

docker-compose -p mongo-kafka down

MongoDB Kafka Connector をインストールする方法については、「 MongoDB Kafka Connector のインストール」のガイドを参照してください。

Apache Kafka から MongoDB にデータを処理して移動する方法の詳細については、 Sink Connectorのガイド を参照してください。

MongoDBからApache Kafkaにデータを処理して移動する方法の詳細については、 「ソースConnector 」 のガイドを参照してください。

戻る

新機能