トピックの名前付け
Overview
このページの例は、MongoDB Kafka ソース コネクタを構成して、レコードを公開するトピックの名前をカスタマイズする方法を示しています。
デフォルトでは、MongoDB Kafka ソース コネクタは、変更イベントが発生した MongoDB名前空間と同じ名前の Kafka トピックに変更イベント データを公開します。 string名前空間は、データベース名とコレクション名をドット({0.
)文字で連結した です。
次のセクションでは、コネクタが変更イベント データを公開する Kafka トピックをカスタマイズするさまざまな方法を示します。
トピック プレフィックス
変更イベント データの名前空間の先頭に string を付加し、その Kafka トピックにレコードを公開するようにソース コネクタを構成できます。 この設定により、プレフィックスと名前空間が「.」と自動的に連結されます。 文字の。
トピック プレフィックスを指定するには、次の例に示すようにtopic.prefix
構成設定を使用します。
topic.prefix=myPrefix database=test collection=data
設定が完了すると、コネクタはtest
データベース内のdata
コレクションに対するすべての変更を、 myPrefix.test.data
という名前の Kafka トピックに公開します。
トピックサフィックス
変更イベント データの名前空間に string を追加し、その Kafka トピックにレコードを公開するようにソース コネクタを構成できます。 この設定により、名前空間とサフィックスが「」と自動的に連結されます。 文字の。
トピック サフィックスを指定するには、次の例に示すようにtopic.suffix
構成設定を使用します。
topic.suffix=mySuffix database=test collection=data
設定が完了すると、コネクタはtest
データベース内のdata
コレクションに対するすべての変更を、 test.data.mySuffix
という名前の Kafka トピックに公開します。
トピック名前空間マップ
ソース コネクタを構成して、名前空間値を受信変更イベント データの Kafka トピック名にマッピングできます。 トピック名前空間マップには、名前空間パターンと宛先トピック名テンプレートで構成されるペアが含まれています。
次のセクションでは、connector が名前空間を解釈し、それを Kafka トピックにマッピングする方法について説明します。 データベースとコレクションを Kafka トピックに直接マッピングするだけでなく、コネクターはトピック名前空間マップで正規表現とワイルドカードのペアの使用をサポートしています。
名前空間マップ内のペアの順序は、コネクタがイベントをトピックに書き込む方法に影響する可能性があります。 connector は、次の順序で名前空間を照合します。
名前空間パターン内のデータベース名とコレクション名とペア。 この名前空間パターンの詳細については、「データベース名とコレクション名の例 」を参照してください。
名前空間パターン内にデータベース名のみを持つペア。 この名前空間パターンの詳細については、「データベース名とコレクション名の例 」を参照してください。
正規表現のペアを順番に並べ替えます。 この名前空間パターンの詳細については、「正規表現の例 」を参照してください。
ワイルドカード ペア。 この名前空間パターンの詳細については、ワイルドカードの例 を参照してください。
データベース名とコレクション名
トピック名前空間マップ内の特定のデータベースとコレクションの名前を指定して、これらのソースから Kafka トピックに変更イベントを書込み (write) できます。
変更イベントのデータベース名または名前空間がマップ内のフィールドのいずれかと一致する場合、コネクタは、そのマッピングに対応するトピック名テンプレートに基づいて宛先トピック名を計算し、イベントをこのトピックに公開します。
変更イベントのデータベース名または名前空間がどのマッピングとも一致しない場合、別のトピック命名設定で特に指定されていない限り、connector はデフォルトのトピック命名スキームを使用してレコードを公開します。
重要
/
文字は名前空間が正規表現パターンであることを示すため、名前空間にこの文字が非正規表現のコンテキストに含まれている場合、コネクタはConnectConfigException
を発生させます。
データベースとコレクションの両方を含むマッピングは、ソース データベース名のみを指定するマッピングよりも優先されます。
重要
名前空間マップの一致は、コネクタが他のトピック命名設定を適用する前に発生します。 定義されている場合、コネクタはマッピング後にドキュメント名にtopic.prefix
とtopic.suffix
設定を適用します。
次の例は、 topic.namespace.map
設定を指定して、 carDb
データベースからautomobiles
トピック名テンプレートへ、およびcarDb.ev
名前空間からelectricVehicles
トピック名テンプレートへのトピック名前空間マッピングを定義する方法を示しています。
topic.namespace.map={"carDb": "automobiles", "carDb.ev": "electricVehicles"}
carDb.ev
名前空間マッピングはcarDb
マッピングよりも優先されるため、コネクタは次のアクションを実行します。
変更イベントがデータベース
carDb
とコレクションev
から送信された場合、コネクタは宛先をelectricVehicles
トピックに設定します。変更イベントがデータベース
carDb
とev
以外のコレクションから送信された場合、コネクタは宛先をautomobiles.<collection name>
トピックに設定します。変更ドキュメントが
carDb
以外のデータベースから取得された場合、コネクタは宛先トピックをデフォルトの名前空間命名スキームに設定します。topic.prefix
とtopic.suffix
設定を定義すると、コネクタは名前空間マッピングを実行した後、その値を宛先トピック名に適用します。
正規表現
トピック名前空間マップ内で正規表現(regex)を使用できます。 正規表現を使用するには、名前空間パターンをスラッシュ( /
)文字で開始します。 java.util.regex.Pattern
クラスで指定された構文と動作に従って正規表現を作成します。
connector は、トピック名テンプレートに対して変数を展開してトピック名を計算します。 connector は次の変数をサポートしています。
db
: 一致する名前空間からのデータベース名。sep
:topic.separator
構成プロパティの値。 To learn more about this property, see Kafka Topic Properties.coll
: 一致する名前空間からのコレクション名。コレクション名がない場合は空のstring 。sep_coll
:sep
変数の値がプレフィックスが付いたcoll
変数の値、またはcoll
の値が空の場合は空の string。coll_sep
:sep
変数の値が末尾に追加されたcoll
変数の値、またはcoll
の値が空の場合は空の string。sep_coll_sep
:sep
変数の値がプレフィックスとサフィックスが付いたcoll
変数の値、またはcoll
の値が空の場合は空の string。
トピック名テンプレートで以前の変数のいずれかを使用する場合は、コネクタがそれらを展開するように中括弧( {}
)で囲む必要があります。 トピック名テンプレートでは、中括弧は他の目的で使用できません。
注意
エスケープ文字
名前空間パターンを作成するときは、エスケープする必要がある文字を適切に処理できることを確認してください。 たとえば、 .
を一致させるには、正規表現構文により\.
としてエスケープする必要があります。ただし、JSON 構文に従って、バックスラッシュ\
文字を\\
としてエスケープする必要があります。 次に、名前空間パターンで.
を一致させるには、それを\\.
として書込む必要があります。
この例では、コネクタが次のマッピングを実行するようにtopic.namespace.map
設定を指定する方法を示しています。
正規表現に一致するデータベースからのイベントを、
industrial{sep_coll}
トピック名テンプレートから計算されたトピックに書き込みます。 正規表現パターンは、vertical
データベース名を持つ任意の名前空間と一致します。vertical.health
名前空間のイベントをhealthcare
トピック名に書き込みます。 この場合、トピック名テンプレートと計算されたトピック名は同じになります。
topic.namespace.map={"/vertical(?:\\..*)?": "industrial{sep_coll}", "vertical.health": "healthcare"}
注意
この例では、 topic.separator
構成プロパティは"."
であり、デフォルト値です。
vertical.health
名前空間マッピングは正規表現の名前空間マッピングよりも優先されるため、コネクタは次のアクションを実行します。
変更イベントが
vertical
データベースのhealth
コレクションから送信される場合、コネクタは宛先をhealthcare
トピックに設定します。変更イベントが
vertical
データベース名を持つ他の名前空間からのものである場合、コネクタはindustrial{sep_coll}
トピック名テンプレートに基づいて宛先トピックを計算します。 次の例は、このマッピングを示しています。変更イベントが
vertical.wasteManagement
名前空間から取得されている場合、コネクタはindustrial.wasteManagement
トピックに書込みます。変更イベントが
vertical
データベースから取得され、特定のコレクションがない場合、コネクタはindustrial
トピックに書込みます。
変更ドキュメントが正規表現パターンに一致しないデータベースから取得されている場合、コネクタは宛先トピックをデフォルトの名前空間命名スキームに設定します。
topic.prefix
とtopic.suffix
設定を定義すると、コネクタは名前空間マッピングを実行した後、その値を宛先トピック名に適用します。
ワイルドカード
データベース名とコレクション名に示されているように、トピック名前空間マップでデータベース名と名前空間を指定するだけでなく、ワイルドカード*
を使用して、マッピングなしですべてのデータベースと名前空間の変更イベントを照合できます。
topic.namespace.map={"carDb": "automobiles", "carDb.ev": "electricVehicles", "*": "otherVehicles"}
前述のワイルドカードの例では、コネクタはcarDb
以外のすべてのデータベースに発生した変更ドキュメントをotherVehicles
トピックに公開します。