Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

Sink Connector ポストプロセッサ

項目一覧

  • Overview
  • ポストプロセッサによるデータの変更方法
  • ポストプロセッサの指定方法
  • 事前に構築されたポストプロセッサ
  • ドキュメント ID 追加用ポストプロセッサの構成
  • ポストプロセッサの例
  • 許可リストとブロック リストの例
  • 許可リストプロジェクションの例
  • ブロック リスト プロジェクションの例
  • プロジェクション ワイルドカード パターン マッチングの例
  • フィールドの名前変更の例
  • カスタムポストプロセッサの作成方法

このページでは、MongoDB Kafka シンク コネクタでポストプロセッサを構成する方法を学習できます。 ポストプロセッサは、コネクタが MongoDB コレクションに保存する前に、コネクタが Kafka トピックから読み取った Sink レコードを変更します。 ポストプロセッサが行うことができるデータ変更のいくつかの例には、次のようなものがあります。

  • ドキュメント _idフィールドをカスタム値に設定

  • メッセージのキー フィールドまたは値フィールドを含める、または除外する

  • フィールドの名前を変更する

コネクターに含まれる事前に構築されたポストプロセッサを使用するか、独自の実装を使用できます。

ポストプロセッサの詳細については、次のセクションを参照してください。

ポストプロセッサは、 Kafka トピックから読み取られたデータを変更します。 connectorは、 Kafka SinkRecordのキーと値のフィールドの表現を含むSinkDocumentクラスにメッセージを保存します。 コネクタは、構成で指定されたすべてのポストプロセッサを順番に適用し、その結果を MongoDB コレクションに保存します。

ポストプロセッサは、ドキュメント_idフィールドの生成、メッセージ キーまたは値 フィールドのプロジェクション、フィールドの名前変更などのデータ変更タスクを実行します。 コネクタに含まれる事前構築済みのポストプロセッサを使用することも、 PostProcessor を拡張して独自の を実装することもできます クラス。

重要

ポストプロセッサと変更データキャプチャ(CDC)ハンドラー

CDC ハンドラーイベント データに ポストプロセッサ を適用することはできません。 両方を指定すると、コネクタは警告をログに記録します。

post.processor.chain構成設定では、1 つ以上のポストプロセッサをカンマ区切りのリストとして指定できます。 複数を指定すると、コネクタはそれらを順番に適用し、各ポストプロセッサは前のもののデータ出力を変更します。

connector が MongoDB に書き込むドキュメントに一意の_idフィールドが含まれるようにするには、DocumentIdAdder ポストプロセッサを含めない場合、チェーンの最初の位置にポストプロセッサが自動的に追加されます。

次の設定例では、コネクタが最初にKafkaMetaAdderポストプロセッサを実行し、次にAllowListValueProjectorポストプロセッサを出力で実行することを指定します。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

次の表には、Sink Connector に含まれるすべてのポストプロセッサのリストが含まれています。

プロセッサ名の変更
説明
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.

DocumentIdAdderポストプロセッサは、戦略を使用して、MongoDB ドキュメントの_idフィールドの形式方法を決定します。 戦略は、ユースケースに合わせてカスタマイズできる事前設定された動作を定義します。

次の例に示すように、 document.id.strategy設定でこのポストプロセッサの戦略を指定できます。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

次の表は、 DocumentIdAdderポストプロセッサを構成するために使用できる戦略の一覧を示しています。

戦略名
説明
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

組み込みのドキュメント ID 追加戦略でユースケースをカバーしない場合は、以下の手順に従ってカスタムのドキュメント ID 戦略を定義できます。

  1. インターフェース IdStratey を実装する Java クラスを作成します と にはカスタム構成ロジックが含まれます。

  2. クラスを JAR ファイルにコンパイルします。

  3. コンパイルされた JAR を、すべての Kafka ワーカーのクラス パスまたはプラグイン パスに追加します。 プラグイン パスの詳細については、 Confluent のドキュメントを参照してください。

  4. すべての Kafka ワーカーのカスタム クラスの完全なクラス名にdocument.id.strategy設定を更新します。

注意

選択した戦略は提供セマンティクスに影響を与える可能性があります

BSON ObjectId または UUID 戦略では、コネクタが再試行またはレコードを再度処理するときに新しい ID を生成するため、少なくとも 1 回の配信のみを保証できます。 他の戦略では、ドキュメント ID を構成するフィールドが一意であることを保証できる場合は、1 回限りの配信が許可されます。

インターフェースの実装例については、IdStrategy ID 戦略の実装 を含むソースコード ディレクトリを参照してください コネクタにパッケージ化されています。

このセクションでは、次のタイプのポストプロセッサの構成例とサンプル出力を示します。

許可リストブロックリストのプロジェクションのポストプロセッサは、出力に含めるフィールドと除外するフィールドを決定します。

許可リストプロジェクションを使用する場合、ポストプロセッサは指定されたフィールドからのデータのみを出力します。

ブロックリストプロジェクションを使用する場合、指定されたフィールドのデータのみが後処理では省略されます。

注意

を使用できます。 レコード内のネストされたフィールドを参照するための(ドット)表記。 表記を使用して、配列内のドキュメントのフィールドを参照することもできます。

ポストプロセッサ チェーンにプロジェクションを追加する場合は、プロジェクションのタイプと、Sink ドキュメントのキーまたは値の部分にそれを適用するかどうかを指定する必要があります。

プロジェクションの構成と出力の例については、次のセクションを参照してください。

Kafka レコード値ドキュメントが次のユーザー プロファイル データに似ているとします。

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

次の設定を使用して、値ドキュメントの「name」、「address. Atlas」、「趣味」フィールドなどの選択データを保存するようにAllowList値プロジェクションを構成できます。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Kafka レコード キー ドキュメントが次のユーザー識別データに似ているとします。

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

"authToken" と "registration.source" を省略するようにBlockListキー プロジェクションを構成できます フィールド(次の設定でデータを保存する前に)。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

このセクションでは、フィールド名と一致するワイルドカード パターンを一致するようにプロジェクションのポストプロセッサを構成する方法を示します。

パターン
説明
*
現在のレベル内の任意の文字数と一致します。
**
現在のレベルおよびネストされたすべてのレベル内の任意の文字と一致します。

このセクションの許可リストとブロックリストのワイルドカード パターン マッチングの例については、気象測定値を含む次の値ドキュメントを参照してください。

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

複数のフィールド名を一致させるには、 *ワイルドカードを使用します。 次のサンプル構成では、次のフィールドが一致しています。

  • 最上位のフィールドには「City」

  • "wind_step" という名前で始まる最上位フィールドのサブドキュメントである "average" という名前のフィールド。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

ポストプロセッサが許可リストプロジェクションを適用した後、次のレコードを出力します。

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

ワイルドカードを指定したレベルから任意のレベルのオブジェクトに一致する**ワイルドカードを使用できます。 次のワイルドカードに一致する例では、「low」という名前のフィールドを含む任意のドキュメントをプロジェクションします。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

プロジェクションを適用する ポストプロセッサ は、次のレコードを出力します。

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

次のブロックリスト構成例に示すように、ワイルドカード パターンを使用して、特定のドキュメント レベルのフィールドを一致させることができます。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

このセクションでは、 RenameByMappingRenameByRegexフィールド リネーム ポストプロセッサを構成して、シンク レコードのフィールド名を更新する方法を説明します。 フィールドの名前変更設定では、以下を指定します。

  • レコード内のキーまたは値のドキュメントを更新するかどうか

  • 更新するフィールド名

  • 新しいフィールド名

JSON 配列でRenameByMappingRenameByRegexの設定を指定する必要があります。 ドット表記 または パターン一致 のいずれかを使用して、ネストされたフィールドを指定できます。

フィールドリネームのポストプロセッサの例では、次の例のシンク レコードを使用します。

キー ドキュメント

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

値ドキュメント

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

RenameByMappingポストプロセッサ 設定では、string に一致するフィールドを新しい名前に割り当てる 1 つ以上の JSON オブジェクトを指定します。 各オブジェクトには、以下の表に示すように、 oldName要素に一致するテキストと、 newName要素内の置換テキストが含まれています。

キー名
説明
oldName
キーまたは値のドキュメントと、置き換えるフィールド名を一致させるかどうかを指定します。 設定では「」が使用されます。 文字を使用して、2 つの値を区切ります。
newName
フィールドのすべての一致の置換フィールド名を指定します。

次のサンプル プロパティは、キー ドキュメントの「location」フィールドを照合し、その名前を「country」に変更します。

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

この設定は、 RenameByMappingポストプロセッサ に元のキー ドキュメントを次のドキュメントに変換するように指示します。

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

次のように、 oldNameフィールドにフィールド名が追加された値のドキュメントを指定することで、値のドキュメントでも同様のフィールド名の割り当てを実行できます。

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

この設定は、 RenameByMappingポストプロセッサ に、元の値ドキュメントを次のドキュメントに変換するように指示します。

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

次の設定に示すように、string 形式の JSON 配列を使用して、 field.renamer.mappingプロパティで 1 つ以上のマッピングを指定することもできます。

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

RenameByRegexポストプロセッサ 設定では、一致する必要があるフィールド名とテキスト パターンと、一致したテキストの置換値を指定します。 次の表で説明されているフィールドを含む JSON オブジェクトに、1 つ以上の名前変更式を指定できます。

キー名
説明
regexp
置換を実行するフィールドに一致する正規表現が含まれます。
パターン
置き換えるテキストに一致する正規表現を含みます。
replace
patternフィールドで定義した正規表現のすべての一致の置換テキストが含まれます。

次の例の設定では、ポストプロセッサに次の処理を実行するように指示します。

  • キー ドキュメント内の「date」で始まる任意のフィールド名と一致します。 一致するフィールドのセットで、パターン_に一致するすべてのテキストを-文字に置き換えます。

  • crepesのサブドキュメントである、値ドキュメント内の任意のフィールド名と一致します。 一致するフィールドのセットで、パターンpurchasedに一致するすべてのテキストをquantityに置き換えます。

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

コネクタがサンプル キー ドキュメントサンプル 値ドキュメントに ポストプロセッサ を適用すると、次の内容を出力します。

キー ドキュメント

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

値ドキュメント

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

警告

リネーム後のプロセッサは既存のフィールド名を上書きしません

リネーム ポストプロセッサで に設定するターゲット フィールド名により、同じドキュメント内に重複するフィールド名が発生する可能性があります。 これを回避するために、ポストプロセッサは、ドキュメントの同じレベルで既存のフィールド名が重複する場合、名前の変更をスキップします。

組み込みのポストプロセッサがユースケースをカバーしない場合は、次の手順を使用してカスタムのポストプロセッサクラスを作成できます。

  1. PostProcessor を拡張する Java クラスを作成する 抽象クラス。

  2. クラス内のprocess()メソッドをオーバーライドします。 シンク レコードのキー フィールドと値フィールドの BSON 表現であるSinkDocumentを更新し、 メソッドで元の Kafka SinkRecordにアクセスできます。

  3. クラスを JAR ファイルにコンパイルします。

  4. コンパイルされた JAR を、すべての Kafka ワーカーのクラス パスまたはプラグイン パスに追加します。 プラグイン パスの詳細については、Confluent のドキュメント「 Community Connector の手動インストール 」を参照してください。

  5. ポストプロセッサの完全なクラス名を書き込みプロセッサ チェーン構成に追加します。

ポストプロセッサの例について は、組み込みのポストプロセッサ クラスのソースコードを参照できます。

戻る

モデル戦略の書込み