変更イベントをフィルタリングするためのパイプラインのカスタマイズ
この使用例では、MongoDB Kafka ソース コネクタが消費するデータをカスタマイズするようにパイプラインを構成する方法を示します。 パイプライン は、データをフィルタリングまたは変換するためのデータベースへの指示で構成される MongoDB 集計パイプラインです。
MongoDB は、変更ストリーム上の集計パイプラインに一致するデータの変更をコネクタに通知します。 変更ストリームは、MongoDB 配置に対してクライアントが行ったデータ変更をリアルタイムで記述するイベントのシーケンスです。 詳しくは、 Change Streams に関する MongoDB サーバーのマニュアル エントリを参照してください。
例
イベントを調整していて、特定のイベントでの各ゲストの名前と訪問時間を収集したいとします。 ゲストがイベントにチェックインするたびに、アプリケーションは次の詳細を含む新しいドキュメントを挿入します。
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
connector pipeline
設定を定義して、変更ストリームに変更イベント情報をフィルタリングするように指示できます。
挿入操作の変更イベントを作成し、他のすべてのタイプの操作のイベントを省略します。
fullDocument.eventId
の値「321」と一致するドキュメントのみの変更イベントを作成し、他のすべてのドキュメントを省略します。_id
eventId
fullDocument
プロジェクションを使用して、 オブジェクトから フィールドと フィールドを省略します。
これらの変換を適用するには、次の集計パイプラインをpipeline
設定に割り当てます。
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
重要
_id
ns
パイプラインの結果に、payload
オブジェクトの最上位の フィールドと フィールドが含まれていることを確認します。MongoDB は、再開トークンの値としてid
を使用し、 Kafka 出力トピック名を生成するためにns
を使用します。
アプリケーションがサンプル ドキュメントを挿入すると、構成されたコネクターは次のレコードを Kafka トピックに公開します。
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
ソース コネクタを使用して変更ストリームを管理する方法の詳細については、 Change Streams に関するコネクタのドキュメントを参照してください。