Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

既存のコレクションの時系列コレクションへの移行

項目一覧

  • 時系列コレクションへのコレクションの移行
  • 概要
  • 詳細

このチュートリアルに従って、MongoDB Kafka Connector を使用して既存の MongoDB コレクションを時系列コレクションに変換する方法を学びます。

時系列コレクションは時系列データを効率的に保存します。 時系列データは、時間間隔で取得された測定値、測定を説明するメタデータ、測定時間で構成されています。

connector を使用して MongoDB コレクションから時系列コレクションにデータを変換するには、次のタスクを実行する必要があります。

  1. コレクション内のすべてのドキュメントに共通の時間フィールドを識別します。

  2. 既存のコレクション データを Kafka トピックにコピーするようにソース コネクタを構成します。

  3. Kafka トピック データを時系列コレクションにコピーするように Sink Connector を構成します。

このチュートリアルでは、前述のタスクを実行して、株価データを コレクションから時系列コレクションに移行します。 時系列コレクションはデータをより効率的に保存してインデックスを作成し、 集計演算子 を使用して経時的な株価パフォーマンスを分析する機能を保持します。

1

Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。

2

Docker 環境で次のコマンドを実行してスクリプトを起動し、チュートリアルの MongoDB レプリカセットに作成された株式記号とその価格を含むサンプル コレクションを生成します。

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

データ ジェネレーターの実行を開始すると、次のような生成されたデータが表示されます。

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

別のターミナル ウィンドウで、次のコマンドを使用して、チュートリアル セット用にダウンロードしたチュートリアル Docker コンテナにインタラクティブ shell セッションを作成します。

docker exec -it mongo1 /bin/bash

次のコマンドを使用して、 stock-source.jsonというソース構成ファイルを作成します。

nano stock-source.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

この構成では、コネクターは、 PriceData MongoDB コレクションからmarketdata.Stocks.PriceData Kafka トピックに既存のデータをコピーし、完了すると、そのコレクションに挿入される将来のデータをコピーするように指示します。

作成した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。

cx stock-source.json

注意

cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

ソース コネクタが正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

ソース コネクタが起動したら、次のコマンドを実行して、 Kafka トピックがコレクション データを受信したことを確認します。

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

出力には、次のようなソース コネクタによって公開されるトピック データが表示されます。

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

CTRL+Cと入力して、 kafkacatを終了できます。

4

Kafka トピックからデータを読み取り、 Stocksという名前のデータベース内のStockDataMigrateという名前の時系列コレクションに書き込むように Sink Connector を構成します。

次のコマンドを使用して、 stock-sink.jsonという Sink 構成ファイルを作成します。

nano stock-sink.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

Tip

上記の Sink Connector 構成では、 時間フィールド日付形式変換 が使用されています。 あるいは、 TimestampConverter単一メッセージ変換(SM)を使用して、 tx_timeフィールドをStringからISODateに変換することもできます。 TimestampConverter SNT を使用する場合、 Kafka トピックのデータのスキーマを定義する必要があります。

TimestampConverter付与 の使用方法については、 TimestampConvert を参照してください。 Confluent のドキュメント。

更新した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。

cx stock-sink.json

Sink Connector がトピック データの処理を完了すると、 StockDataMigrate時系列コレクション内のドキュメントには、 ISODate型の値を持つtx_timeフィールドが含まれます。

5

Sink Connector がトピック データの処理を完了すると、 StockDataMigrate時系列コレクションには、 PriceDataコレクションのすべてのマーケット データが含まれます。

MongoDB でデータを表示するには、次のコマンドを実行して、 mongoshを使用してレプリカセットに接続します。

mongosh "mongodb://mongo1"

プロンプトで次のコマンドを入力して、 Stocks.StockDataMigrate MongoDB 名前空間内のすべてのドキュメントを取得します。

use Stocks
db.StockDataMigrate.find()

次のドキュメントのような、 コマンドから返されたドキュメントのリストが表示されます。

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

このチュートリアルでは、MongoDB コレクションにデータを定期的に書込む株式ティッカー データ ジェネレーターを作成しました。 Kafka トピックにデータをコピーするようにソース コネクタを構成し、そのデータを新しい MongoDB 時系列コレクションに書込むように Sink Connector を構成しました。

このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。

戻る

変更データ キャプチャ ハンドラーによるデータの複製