Sink Connector ポストプロセッサ
項目一覧
Overview
このページでは、MongoDB Kafka シンク コネクタでポストプロセッサを構成する方法を学習できます。 ポストプロセッサは、コネクタが MongoDB コレクションに保存する前に、コネクタが Kafka トピックから読み取った Sink レコードを変更します。 ポストプロセッサが行うことができるデータ変更のいくつかの例には、次のようなものがあります。
ドキュメント
_id
フィールドをカスタム値に設定メッセージのキー フィールドまたは値フィールドを含める、または除外する
フィールドの名前を変更する
コネクターに含まれる事前に構築されたポストプロセッサを使用するか、独自の実装を使用できます。
ポストプロセッサの詳細については、次のセクションを参照してください。
ポストプロセッサによるデータの変更方法
ポストプロセッサは、 Kafka トピックから読み取られたデータを変更します。 connectorは、 Kafka SinkRecord
のキーと値のフィールドの表現を含むSinkDocument
クラスにメッセージを保存します。 コネクタは、構成で指定されたすべてのポストプロセッサを順番に適用し、その結果を MongoDB コレクションに保存します。
ポストプロセッサは、ドキュメント_id
フィールドの生成、メッセージ キーまたは値 フィールドのプロジェクション、フィールドの名前変更などのデータ変更タスクを実行します。 コネクタに含まれる事前構築済みのポストプロセッサを使用することも、 PostProcessor を拡張して独自の を実装することもできます クラス。
重要
ポストプロセッサと変更データキャプチャ(CDC)ハンドラー
CDC ハンドラーイベント データに ポストプロセッサ を適用することはできません。 両方を指定すると、コネクタは警告をログに記録します。
ポストプロセッサの指定方法
post.processor.chain
構成設定では、1 つ以上のポストプロセッサをカンマ区切りのリストとして指定できます。 複数を指定すると、コネクタはそれらを順番に適用し、各ポストプロセッサは前のもののデータ出力を変更します。
connector が MongoDB に書き込むドキュメントに一意の_id
フィールドが含まれるようにするには、DocumentIdAdder
ポストプロセッサを含めない場合、チェーンの最初の位置にポストプロセッサが自動的に追加されます。
次の設定例では、コネクタが最初にKafkaMetaAdder
ポストプロセッサを実行し、次にAllowListValueProjector
ポストプロセッサを出力で実行することを指定します。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
事前に構築されたポストプロセッサ
次の表には、Sink Connector に含まれるすべてのポストプロセッサのリストが含まれています。
プロセッサ名の変更 | 説明 | |
---|---|---|
DocumentIdAdder | Full Path:
Inserts an _id field determined by the configured strategy.The default strategy is BsonOidStrategy .For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
BlockListKeyProjector | Full Path:
Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
BlockListValueProjector | Full Path:
Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListKeyProjector | Full Path:
Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListValueProjector | Full Path:
Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
KafkaMetaAdder | Full Path:
Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
RenameByMapping | Full Path:
Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
RenameByRegex | Full Path:
Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. | |
NullFieldValueRemover | Full Path:
Removes all document fields that contain null values from the sink record. |
ドキュメント ID 追加用ポストプロセッサの構成
DocumentIdAdder
ポストプロセッサは、戦略を使用して、MongoDB ドキュメントの_id
フィールドの形式方法を決定します。 戦略は、ユースケースに合わせてカスタマイズできる事前設定された動作を定義します。
次の例に示すように、 document.id.strategy
設定でこのポストプロセッサの戦略を指定できます。
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
次の表は、 DocumentIdAdder
ポストプロセッサを構成するために使用できる戦略の一覧を示しています。
戦略名 | 説明 | |
---|---|---|
BsonOidStrategy | Full Path:
Generates a MongoDB BSON ObjectId. Default strategy for the DocumentIdAdder post processor. | |
KafkaMetaDataStrategy | Full Path:
Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
FullKeyStrategy | Full Path:
Uses the complete key structure of the sink document to generate the
value for the _id field.Defaults to a blank document if no key exists. | |
ProvidedInKeyStrategy | Full Path:
Uses the _id field specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
ProvidedInValueStrategy | Full Path:
Uses the _id field specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
PartialKeyStrategy | Full Path:
Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
PartialValueStrategy | Full Path:
Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
UuidProvidedInKeyStrategy | Full Path:
Converts the _id key field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidProvidedInValueStrategy | Full Path:
Converts the _id value field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidStrategy | Full Path:
Uses a randomly generated UUID in string format. |
カスタムドキュメント ID 戦略の作成
組み込みのドキュメント ID 追加戦略でユースケースをカバーしない場合は、以下の手順に従ってカスタムのドキュメント ID 戦略を定義できます。
インターフェース IdStratey を実装する Java クラスを作成します と にはカスタム構成ロジックが含まれます。
クラスを JAR ファイルにコンパイルします。
コンパイルされた JAR を、すべての Kafka ワーカーのクラス パスまたはプラグイン パスに追加します。 プラグイン パスの詳細については、 Confluent のドキュメントを参照してください。
すべての Kafka ワーカーのカスタム クラスの完全なクラス名に
document.id.strategy
設定を更新します。
注意
選択した戦略は提供セマンティクスに影響を与える可能性があります
BSON ObjectId または UUID 戦略では、コネクタが再試行またはレコードを再度処理するときに新しい ID を生成するため、少なくとも 1 回の配信のみを保証できます。 他の戦略では、ドキュメント ID を構成するフィールドが一意であることを保証できる場合は、1 回限りの配信が許可されます。
インターフェースの実装例については、IdStrategy
ID 戦略の実装 を含むソースコード ディレクトリを参照してください コネクタにパッケージ化されています。
ポストプロセッサの例
このセクションでは、次のタイプのポストプロセッサの構成例とサンプル出力を示します。
許可リストとブロック リストの例
許可リストとブロックリストのプロジェクションのポストプロセッサは、出力に含めるフィールドと除外するフィールドを決定します。
許可リストプロジェクションを使用する場合、ポストプロセッサは指定されたフィールドからのデータのみを出力します。
ブロックリストプロジェクションを使用する場合、指定されたフィールドのデータのみが後処理では省略されます。
注意
を使用できます。 レコード内のネストされたフィールドを参照するための(ドット)表記。 表記を使用して、配列内のドキュメントのフィールドを参照することもできます。
ポストプロセッサ チェーンにプロジェクションを追加する場合は、プロジェクションのタイプと、Sink ドキュメントのキーまたは値の部分にそれを適用するかどうかを指定する必要があります。
プロジェクションの構成と出力の例については、次のセクションを参照してください。
許可リストプロジェクションの例
Kafka レコード値ドキュメントが次のユーザー プロファイル データに似ているとします。
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
次の設定を使用して、値ドキュメントの「name」、「address. Atlas」、「趣味」フィールドなどの選択データを保存するようにAllowList
値プロジェクションを構成できます。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
ブロック リスト プロジェクションの例
Kafka レコード キー ドキュメントが次のユーザー識別データに似ているとします。
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
"authToken" と "registration.source" を省略するようにBlockList
キー プロジェクションを構成できます フィールド(次の設定でデータを保存する前に)。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
プロジェクション ワイルドカード パターン マッチングの例
このセクションでは、フィールド名と一致するワイルドカード パターンを一致するようにプロジェクションのポストプロセッサを構成する方法を示します。
パターン | 説明 |
| 現在のレベル内の任意の文字数と一致します。 |
| 現在のレベルおよびネストされたすべてのレベル内の任意の文字と一致します。 |
このセクションの許可リストとブロックリストのワイルドカード パターン マッチングの例については、気象測定値を含む次の値ドキュメントを参照してください。
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
許可リスト ワイルドカードの例
複数のフィールド名を一致させるには、 *
ワイルドカードを使用します。 次のサンプル構成では、次のフィールドが一致しています。
最上位のフィールドには「City」
"wind_step" という名前で始まる最上位フィールドのサブドキュメントである "average" という名前のフィールド。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
ポストプロセッサが許可リストプロジェクションを適用した後、次のレコードを出力します。
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
ワイルドカードを指定したレベルから任意のレベルのオブジェクトに一致する**
ワイルドカードを使用できます。 次のワイルドカードに一致する例では、「low」という名前のフィールドを含む任意のドキュメントをプロジェクションします。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
プロジェクションを適用する ポストプロセッサ は、次のレコードを出力します。
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
ブロック リスト ワイルドカードの例
次のブロックリスト構成例に示すように、ワイルドカード パターンを使用して、特定のドキュメント レベルのフィールドを一致させることができます。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
フィールドの名前変更の例
このセクションでは、 RenameByMapping
とRenameByRegex
フィールド リネーム ポストプロセッサを構成して、シンク レコードのフィールド名を更新する方法を説明します。 フィールドの名前変更設定では、以下を指定します。
レコード内のキーまたは値のドキュメントを更新するかどうか
更新するフィールド名
新しいフィールド名
JSON 配列でRenameByMapping
とRenameByRegex
の設定を指定する必要があります。 ドット表記 または パターン一致 のいずれかを使用して、ネストされたフィールドを指定できます。
フィールドリネームのポストプロセッサの例では、次の例のシンク レコードを使用します。
キー ドキュメント
{ "location": "Provence", "date_month": "October", "date_day": 17 }
値ドキュメント
{ "flapjacks": { "purchased": 598, "size": "large" } }
マッピング例による名前の変更
RenameByMapping
ポストプロセッサ 設定では、string に一致するフィールドを新しい名前に割り当てる 1 つ以上の JSON オブジェクトを指定します。 各オブジェクトには、以下の表に示すように、 oldName
要素に一致するテキストと、 newName
要素内の置換テキストが含まれています。
キー名 | 説明 |
---|---|
oldName | キーまたは値のドキュメントと、置き換えるフィールド名を一致させるかどうかを指定します。 設定では「」が使用されます。 文字を使用して、2 つの値を区切ります。 |
newName | フィールドのすべての一致の置換フィールド名を指定します。 |
次のサンプル プロパティは、キー ドキュメントの「location」フィールドを照合し、その名前を「country」に変更します。
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
この設定は、 RenameByMapping
ポストプロセッサ に元のキー ドキュメントを次のドキュメントに変換するように指示します。
{ "country": "Provence", "date_month": "October", "date_day": 17 }
次のように、 oldName
フィールドにフィールド名が追加された値のドキュメントを指定することで、値のドキュメントでも同様のフィールド名の割り当てを実行できます。
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
この設定は、 RenameByMapping
ポストプロセッサ に、元の値ドキュメントを次のドキュメントに変換するように指示します。
{ "crepes": { "purchased": 598, "size": "large" } }
次の設定に示すように、string 形式の JSON 配列を使用して、 field.renamer.mapping
プロパティで 1 つ以上のマッピングを指定することもできます。
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
正規表現による名前の変更
RenameByRegex
ポストプロセッサ 設定では、一致する必要があるフィールド名とテキスト パターンと、一致したテキストの置換値を指定します。 次の表で説明されているフィールドを含む JSON オブジェクトに、1 つ以上の名前変更式を指定できます。
キー名 | 説明 |
---|---|
regexp | 置換を実行するフィールドに一致する正規表現が含まれます。 |
パターン | 置き換えるテキストに一致する正規表現を含みます。 |
replace |
|
次の例の設定では、ポストプロセッサに次の処理を実行するように指示します。
キー ドキュメント内の「date」で始まる任意のフィールド名と一致します。 一致するフィールドのセットで、パターン
_
に一致するすべてのテキストを-
文字に置き換えます。crepes
のサブドキュメントである、値ドキュメント内の任意のフィールド名と一致します。 一致するフィールドのセットで、パターンpurchased
に一致するすべてのテキストをquantity
に置き換えます。
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
コネクタがサンプル キー ドキュメントとサンプル 値ドキュメントに ポストプロセッサ を適用すると、次の内容を出力します。
キー ドキュメント
{ "location": "Provence", "date-month": "October", "date-day": 17 }
値ドキュメント
{ "crepes": { "quantity": 598, "size": "large" } }
警告
リネーム後のプロセッサは既存のフィールド名を上書きしません
リネーム ポストプロセッサで に設定するターゲット フィールド名により、同じドキュメント内に重複するフィールド名が発生する可能性があります。 これを回避するために、ポストプロセッサは、ドキュメントの同じレベルで既存のフィールド名が重複する場合、名前の変更をスキップします。
カスタムポストプロセッサの作成方法
組み込みのポストプロセッサがユースケースをカバーしない場合は、次の手順を使用してカスタムのポストプロセッサクラスを作成できます。
PostProcessor を拡張する Java クラスを作成する 抽象クラス。
クラス内の
process()
メソッドをオーバーライドします。 シンク レコードのキー フィールドと値フィールドの BSON 表現であるSinkDocument
を更新し、 メソッドで元の KafkaSinkRecord
にアクセスできます。クラスを JAR ファイルにコンパイルします。
コンパイルされた JAR を、すべての Kafka ワーカーのクラス パスまたはプラグイン パスに追加します。 プラグイン パスの詳細については、Confluent のドキュメント「 Community Connector の手動インストール 」を参照してください。
ポストプロセッサの完全なクラス名を書き込みプロセッサ チェーン構成に追加します。
ポストプロセッサの例について は、組み込みのポストプロセッサ クラスのソースコードを参照できます。