Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

既存のデータのコピー

この使用例では、MongoDB Kafka ソース コネクタを使用して、MongoDB コレクションから Apache Kafka トピックにデータをコピーする方法を示しています。

MongoDB コレクションを Apache Kafka にコピーし、いくつかのデータをフィルタリングするとします。

要件とソリューションは次のとおりです。

要件
解決法

MongoDB 配置内のshoppingデータベースの customersコレクションを Apache Kafka トピックにコピーします。

See the Copy Data section of this guide.

countryフィールドに "Mexco" の値があるドキュメントのみをコピーします。

See the Filter Data section of this guide.

customers コレクションには次のドキュメントが含まれます。

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

ソース コネクタで次の構成オプションを指定して、 shoppingデータベースのcustomersコレクションの内容をコピーします。

database=shopping
collection=customers
startup.mode=copy_existing

ソース コネクタは、コレクションへの各ドキュメントの挿入を説明する変更イベント ドキュメントを作成してコレクションをコピーします。

注意

データコピーは重複したイベントを生成する可能性があります

ソース コネクタが既存のデータを変換するときに、いずれかのシステムがデータベース内のデータを変更した場合、MongoDB は最新の変更を反映するために重複した変更ストリーム イベントを生成することがあります。 データコピーが依存する変更ストリーム イベントは偶数であるため、コピーされたデータは結果整合性があります。

変更イベント ドキュメントの詳細については、 Change Streamsのガイドを参照してください。

startup.modeオプションの詳細については、「起動プロパティ 」を参照してください。

ソース コネクタ構成のstartup.mode.copy.existing.pipelineオプションで集計パイプラインを指定することで、データをフィルタリングできます。 次の構成では、 countryフィールドに「Mexco」が含まれるすべてのドキュメントに一致する集計パイプラインを指定します。

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

startup.mode.copy.existing.pipelineオプションの詳細については、「起動プロパティ 」を参照してください。

集計パイプラインの詳細については、次のリソースを参照してください。

  • 変更イベントをフィルタリングするためのパイプラインのカスタマイズの使用例

  • MongoDB マニュアルの集計

customersコレクションをコピーするための最終的なソース コネクタ構成は次のようになります。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

コネクタがデータをコピーすると、 shopping.customers Apache Kafka トピックには、前述のサンプル コレクションに対応する次の変更イベント ドキュメントが表示されます。

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

注意

トピック内のデータをコレクションに書き込む

変更データキャプチャ ハンドラー を使用して、Apache Kafka トピック内の変更イベントドキュメントを MongoDB 書込み (write) 操作に変換します。 詳細については、「変更データ キャプチャ ハンドラーのガイド」を参照してください。

戻る

トピックの名前付け