既存のデータのコピー
この使用例では、MongoDB Kafka ソース コネクタを使用して、MongoDB コレクションから Apache Kafka トピックにデータをコピーする方法を示しています。
例
MongoDB コレクションを Apache Kafka にコピーし、いくつかのデータをフィルタリングするとします。
要件とソリューションは次のとおりです。
要件 | 解決法 |
---|---|
MongoDB 配置内の shopping データベースの customers コレクションを Apache Kafka トピックにコピーします。 | See the Copy Data section of this guide. |
country フィールドに "Mexco" の値があるドキュメントのみをコピーします。 | See the Filter Data section of this guide. |
customers
コレクションには次のドキュメントが含まれます。
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
データのコピー
ソース コネクタで次の構成オプションを指定して、 shopping
データベースのcustomers
コレクションの内容をコピーします。
database=shopping collection=customers startup.mode=copy_existing
ソース コネクタは、コレクションへの各ドキュメントの挿入を説明する変更イベント ドキュメントを作成してコレクションをコピーします。
注意
データコピーは重複したイベントを生成する可能性があります
ソース コネクタが既存のデータを変換するときに、いずれかのシステムがデータベース内のデータを変更した場合、MongoDB は最新の変更を反映するために重複した変更ストリーム イベントを生成することがあります。 データコピーが依存する変更ストリーム イベントは偶数であるため、コピーされたデータは結果整合性があります。
変更イベント ドキュメントの詳細については、 Change Streamsガイドをご覧ください。
startup.mode
オプションの詳細については、「起動プロパティ 」を参照してください。
フィルター データ
ソース コネクタ構成のstartup.mode.copy.existing.pipeline
オプションで集計パイプラインを指定することで、データをフィルタリングできます。 次の構成では、 country
フィールドに「Mexco」が含まれるすべてのドキュメントに一致する集計パイプラインを指定します。
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
startup.mode.copy.existing.pipeline
オプションの詳細については、「起動プロパティ 」を参照してください。
集計パイプラインの詳細については、次のリソースを参照してください。
MongoDB マニュアルの集計。
構成を指定する
customers
コレクションをコピーするための最終的なソース コネクタ構成は次のようになります。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
コネクタがデータをコピーすると、 shopping.customers
Apache Kafka トピックには、前述のサンプル コレクションに対応する次の変更イベント ドキュメントが表示されます。
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
注意
トピック内のデータをコレクションに書き込む
変更データキャプチャ ハンドラー を使用して、Apache Kafka トピック内の変更イベントドキュメントを MongoDB 書込み (write) 操作に変換します。 詳細については、「変更データ キャプチャ ハンドラーのガイド」を参照してください。