既存のコレクションの時系列コレクションへの移行
このチュートリアルに従って、MongoDB Kafka Connector を使用して既存の MongoDB コレクションを時系列コレクションに変換する方法を学びます。
時系列コレクションは時系列データを効率的に保存します。 時系列データは、時間間隔で取得された測定値、測定を説明するメタデータ、測定時間で構成されています。
connector を使用して MongoDB コレクションから時系列コレクションにデータを変換するには、次のタスクを実行する必要があります。
コレクション内のすべてのドキュメントに共通の時間フィールドを識別します。
既存のコレクション データを Kafka トピックにコピーするようにソース コネクタを構成します。
Kafka トピック データを時系列コレクションにコピーするように Sink Connector を構成します。
このチュートリアルでは、前述のタスクを実行して、株価データを コレクションから時系列コレクションに移行します。 時系列コレクションはデータをより効率的に保存してインデックスを作成し、 集計演算子 を使用して経時的な株価パフォーマンスを分析する機能を保持します。
時系列コレクションへのコレクションの移行
チュートリアル設定を完了する
Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
サンプル データの生成
Docker 環境で次のコマンドを実行してスクリプトを起動し、チュートリアルの MongoDB レプリカセットに作成された株式記号とその価格を含むサンプル コレクションを生成します。
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"
データ ジェネレーターの実行を開始すると、次のような生成されたデータが表示されます。
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ...
Source Connector の構成
別のターミナル ウィンドウで、次のコマンドを使用して、チュートリアル セット用にダウンロードしたチュートリアル Docker コンテナにインタラクティブ shell セッションを作成します。
docker exec -it mongo1 /bin/bash
次のコマンドを使用して、 stock-source.json
というソース構成ファイルを作成します。
nano stock-source.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-source-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": "true", "connection.uri": "mongodb://mongo1", "topic.prefix": "marketdata", "database": "Stocks", "collection": "PriceData", "copy.existing": "true" } }
この構成では、コネクターは、 PriceData
MongoDB コレクションからmarketdata.Stocks.PriceData
Kafka トピックに既存のデータをコピーし、完了すると、そのコレクションに挿入される将来のデータをコピーするように指示します。
作成した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。
cx stock-source.json
注意
cx
コマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status
ソース コネクタが正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ...
ソース コネクタが起動したら、次のコマンドを実行して、 Kafka トピックがコレクション データを受信したことを確認します。
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData
出力には、次のようなソース コネクタによって公開されるトピック データが表示されます。
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}
CTRL+C
と入力して、 kafkacat
を終了できます。
Sink Connector の設定
Kafka トピックからデータを読み取り、 Stocks
という名前のデータベース内のStockDataMigrate
という名前の時系列コレクションに書き込むように Sink Connector を構成します。
次のコマンドを使用して、 stock-sink.json
という Sink 構成ファイルを作成します。
nano stock-sink.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-sink-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "marketdata.Stocks.PriceData", "connection.uri": "mongodb://mongo1", "database": "Stocks", "collection": "StockDataMigrate", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield": "tx_time", "timeseries.timefield.auto.convert": "true", "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
Tip
上記の Sink Connector 構成では、 時間フィールド日付形式変換 が使用されています。 あるいは、 TimestampConverter
単一メッセージ変換(SM)を使用して、 tx_time
フィールドをString
からISODate
に変換することもできます。 TimestampConverter
SNT を使用する場合、 Kafka トピックのデータのスキーマを定義する必要があります。
TimestampConverter
付与 の使用方法については、 TimestampConvert を参照してください。 Confluent のドキュメント。
更新した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx stock-sink.json
Sink Connector がトピック データの処理を完了すると、 StockDataMigrate
時系列コレクション内のドキュメントには、 ISODate
型の値を持つtx_time
フィールドが含まれます。
時系列コレクション データの検証
Sink Connector がトピック データの処理を完了すると、 StockDataMigrate
時系列コレクションには、 PriceData
コレクションのすべてのマーケット データが含まれます。
MongoDB でデータを表示するには、次のコマンドを実行して、 mongosh
を使用してレプリカセットに接続します。
mongosh "mongodb://mongo1"
プロンプトで次のコマンドを入力して、 Stocks.StockDataMigrate
MongoDB 名前空間内のすべてのドキュメントを取得します。
use Stocks db.StockDataMigrate.find()
次のドキュメントのような、 コマンドから返されたドキュメントのリストが表示されます。
{ tx_time: ISODate("2022-05-25T21:16:35.983Z"), _id: ObjectId("628e9..."), symbol: 'FAV', price: 18.43, company_name: 'FUZZY ATTACK VENTURES' }
概要
このチュートリアルでは、MongoDB コレクションにデータを定期的に書込む株式ティッカー データ ジェネレーターを作成しました。 Kafka トピックにデータをコピーするようにソース コネクタを構成し、そのデータを新しい MongoDB 時系列コレクションに書込むように Sink Connector を構成しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。