変更イベントをフィルタリングするためのパイプラインのカスタマイズ
この使用例では、MongoDB Kafka ソース コネクタが消費するデータをカスタマイズするようにパイプラインを構成する方法を示します。 パイプライン は、データをフィルタリングまたは変換するためのデータベースへの指示で構成される MongoDB 集計パイプラインです。
MongoDB は、変更ストリーム上の集計パイプラインに一致するデータの変更をコネクタに通知します。 変更ストリームは、MongoDB 配置に対してクライアントが行ったデータ変更をリアルタイムで記述するイベントのシーケンスです。 詳しくは、 Change Streams に関する MongoDB Server のマニュアル エントリを参照してください。
例
あなたは、特定のイベントで各ゲストの名前と訪問時間を収集する必要があるイベント コーディネーターであるとします。 ゲストがイベントにチェックインするたびに、アプリケーションは次の詳細を含む新しいドキュメントを挿入します。
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
connector pipeline
設定を定義して、変更ストリームに変更イベント情報をフィルタリングするように指示できます。
挿入操作の変更イベントを作成し、他のすべてのタイプの操作のイベントを省略します。
fullDocument.eventId
の値「321」と一致するドキュメントのみの変更イベントを作成し、他のすべてのドキュメントを省略します。_id
eventId
fullDocument
プロジェクションを使用して、 オブジェクトから フィールドと フィールドを省略します。
これらの変換を適用するには、次の集計パイプラインをpipeline
設定に割り当てます。
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
重要
パイプラインの結果に、MongoDB が再開トークンの値として使用するpayload
オブジェクトの最上位の_id
フィールドが含まれていることを確認します。
アプリケーションがサンプル ドキュメントを挿入すると、構成されたコネクターは次のレコードを Kafka トピックに公開します。
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
ソース コネクタを使用して変更ストリームを管理する方法の詳細については、 Change Streams に関するコネクタのドキュメントを参照してください。