Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

モデル戦略の書込み

項目一覧

  • Overview
  • 一括書き込み操作
  • 書込みモデル戦略の指定方法
  • ビジネス キーの指定
  • 1 つのタイムスタンプ更新戦略
  • 1 つのビジネス キー戦略を置き換え
  • 1 つのビジネス キー戦略を削除
  • カスタムモデル戦略の書込み
  • サンプル書込みモデル戦略
  • 戦略をインストールする方法

このガイドでは、MongoDB Kafka Sink Connector が MongoDB にデータを書込む方法を変更する方法について説明します。

次のようなユースケースでは、connector が MongoDB にデータを書き込む方法を変更できます。

  • アップサートではなくドキュメントを挿入

  • _idフィールド以外のフィルターに一致するドキュメントを置換する

  • フィルターに一致するドキュメントを削除します

書込みモデル戦略を指定することで、コネクタが MongoDB にデータを書込む方法を構成できます。 書込み戦略 は、Sink Connector が書込みモデルを使用してデータを書込む方法を定義するクラスです。 書込みモデルは 、書込み操作の構造を定義する MongoDB Java ドライバーのインターフェースです。

コネクタが MongoDB に書き込む前にコネクタが受け取る Sink レコードを変更する方法については、 Sink Connector ポストプロセッサ のガイドをご覧ください。

書込みモデル戦略の実装については、 InsertOneDefaultStratey クラスのソースコードを参照してください。

Sink Connector は、一括書込み操作を使用して MongoDB にデータを書込みます。 一括書込みでは、挿入、更新、または削除などの複数の書込み操作がまとめられています。

デフォルトでは、Sink Connector は順序付き一括書き込みを実行し、データ変更の順序を保証します。 順序付き一括書き込みでは、いずれかの書き込み操作でエラーが発生した場合、コネクタはそのバッチ内の残りの書き込みをスキップします。

データ変更の順序を保証する必要がない場合は、 bulk.write.ordered設定をfalseに設定して、コネクタが順序なしの一括書き込みを実行するようにできます。 Sink Connector は順序付けなしの一括書き込みを並列に実行するため、パフォーマンスの向上が可能です。

さらに、順序なし一括書き込みを有効にし、 errors.tolerance設定をallに設定すると、一括書き込み内のいずれかの書込み操作が失敗した場合でも、コネクタはバッチ内のエラーを返さない残りの書込み操作を実行し続けます。

Tip

bulk.write.ordered 設定の詳細については、「 Connectorメッセージ処理プロパティ 」を参照してください。

一括書き込み操作の詳細については、次のドキュメントを参照してください。

書込み (write) モデル戦略を指定するには、次の設定を使用します。

writemodel.strategy=<write model strategy classname>

コネクタに含まれる事前に構築された書込みモデル戦略のリストについては、書込みモデル戦略の構成に関するガイドを参照してください。

ビジネス キーは、一意であると識別する Sink レコード内の 1 つ以上のフィールドで構成される値です。 デフォルトでは、Sink Connector は Sink レコードの_idフィールドを使用してビジネスキーを取得します。 別のビジネスキーを指定するには、カスタム値を使用するように Document ID Adder ポストプロセッサを構成します。

次のサンプル プロパティに示すように、Sink レコード キーの _id フィールドを設定するようにドキュメントID追加機能を構成できます。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

あるいは、次のサンプル プロパティに示すように、sink レコード値から_idフィールドを設定するように構成することもできます。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

重要

書込みパフォーマンスの向上

ビジネス キーのフィールドに対応するターゲット コレクションに一意のインデックスを作成します。 これにより、Sink Connector からの書き込み操作のパフォーマンスが向上します。 詳細については、一意なインデックスに関するガイドを参照してください。

次の書込みモデル戦略にはビジネスキーが必要です。

  • ReplaceOneBusinessKeyStrategy

  • DeleteOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

Document ID Adder ポストプロセッサの詳細については、「 Document ID Adder ポストプロセッサID構成 」を参照してください。

このセクションでは、次の書込みモデル戦略の構成と出力の例を示します。

MongoDB にドキュメントを書き込むときに、タイムスタンプを追加および更新するように Update One Timestamps 戦略を構成できます。 この戦略は、次のアクションを実行します。

  • コネクタが新しい MongoDB ドキュメントを挿入すると、 フィールドと_insertedTS _modifiedTSフィールドにコネクタのサーバー上の現在時刻が設定されます。

  • connectorが既存の MongoDB ドキュメントを更新すると、 _modifiedTSフィールドがコネクタのサーバー上の現在時刻に更新されます。

ルート上の指定された時刻を追跡し、Sink Connector が次の構造のメッセージを受信したとします。

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

ProvidedInValueStrategyを使用して、コネクタが メッセージの_id値を使用して MongoDB ドキュメントの_idフィールドを割り当てる必要があることを指定します。 次のように、ID と書込みモデル戦略プロパティを指定します。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

Sink Connector が前述のレコードを処理した後、次のドキュメントに示すように、_insertedTS フィールドと_modifiedTS フィールドを含むドキュメントを挿入します。

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

1 時間後、ドキュメントは、次のレコードに示すように、ルート上の新しい場所を新しい位置で報告します。

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Sink Connector が前述のレコードを処理すると、次のデータを含むドキュメントを挿入します。

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

ProvidedInValueStrategyの詳細については、 ドキュメント ID 追加 ポストプロセッサ を構成する方法のセクションを参照してください。

ビジネス キーの値に一致するドキュメントを置き換えるには、 1 つのビジネス キーの置換 戦略を構成します。 レコードの複数のフィールドにビジネス キーを定義し、一致するビジネス キーを含むドキュメントを置き換えるように connector を構成するには、次のタスクを実行します。

  1. ビジネス キー フィールドに対応するコレクション内に一意のインデックスを作成します。

  2. connector構成でビジネスキーに属するフィールドを識別するには、 PartialValueStrategy ID 戦略を指定します。

  3. connector 構成でReplaceOneBusinessKeyStrategy書込みモデル戦略を指定します。

それぞれflight_noairport_codeで表されるドキュメント番号とフィールドのロケーションにより、航空会社の容量を追跡するとします。 メッセージの例には、次の情報が含まれています。

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

flight_noairport_codeをビジネスキーとして使用して戦略を実装するには、まず MongoDB shell でこれらのフィールドに一意のインデックスを作成します。

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

次に、プロジェクション リストでPartialValueStrategy戦略とビジネスキー フィールドを指定します。 次のように、ID と書込みモデル戦略構成を指定します。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

コレクションに挿入されるサンプル データには、次のものが含まれます。

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

コネクタが既存のドキュメントのビジネスキーと一致するデータを処理する際に、ビジネスキー フィールドを変更せずにドキュメントを新しい 値に置き換えます。

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

コネクタが Sink データを処理した後、MongoDB 内の元のサンプル ドキュメントを前のサンプル ドキュメントに置き換えます。

1 つのビジネス キーの削除 戦略を使用して、ビジネス キーに一致するメッセージを受信した場合に、ドキュメントを削除するようにコネクターを構成できます。 レコードの複数のフィールドからビジネスキーを設定し、一致するビジネスキーを含むドキュメントを削除するようにコネクターを構成するには、次のタスクを実行します。

  1. MongoDB コレクションに、ビジネス キー フィールドに対応する一意のインデックスを作成します。

  2. connector構成でビジネスキーに属するフィールドを識別するための ID 戦略としてPartialValueStrategyを指定します。

  3. connector 構成でDeleteOneBusinessKeyStrategy書込みモデル戦略を指定します。

次のようなドキュメントを含むコレクションから、特定の年の暦上のイベントを削除する必要があるとします。

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

yearをビジネスキーとして使用して戦略を実装するには、まず MongoDB shell でこれらのフィールドに一意のインデックスを作成します。

db.collection.createIndex({ "year": 1 }, { unique: true })

次に、次のように、構成でビジネス キーと書込みモデル戦略を指定します。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

connector がビジネスキーyearを含む Sink レコードを処理する場合、MongoDB によって返された フィールド値が一致する最初のドキュメントが削除されます。 connector が、次の値データを含む Sink レコードを処理するとします。

{
"year": 2005,
...
}

コネクタが前述のレコードを処理すると、元の「Dentist Appointment」サンプル ドキュメントなど、値が「 2005 」であるyearフィールドを含む最初のドキュメントが コレクションから削除されます。

connectorに含まれる書込みモデル戦略でユースケースに適合するものがない場合は、独自の戦略を作成できます。

書込み戦略モデルは、 WriteModelStrategyインターフェースを実装し、 createWriteModel()メソッドをオーバーライドする必要がある Java クラスです。

WriteModelStratey インターフェースのソースコードを 参照してください 必要なメソッド署名の場合

次のカスタム書込みモデル戦略は、Sink レコードの_idフィールドに一致する MongoDB ドキュメントを Sink レコードのfullDocumentフィールドの値に置き換える書込み操作を返します。

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

カスタム書込みモデル戦略の別の例については、 UpsertAsPartOfDocumentStratey Githubのサンプル戦略。

カスタム書込み戦略を使用するように Sink Connector を構成するには、次のアクションを完了する必要があります。

  1. カスタム書込み戦略クラスを JAR ファイルにコンパイルします。

  2. コンパイルされた JAR をKafkaワーカーのクラスパス/プラグイン パスに追加します。 プラグイン パスの詳細については、 Confluent のドキュメントを参照してください。

    注意

    Kafka Connect はプラグインを分離してロードします。 カスタム書込み戦略を配置する場合、connector JAR と書込みモデル戦略 JAR の両方が同じパスにある必要があります。 パスは次のようになります。

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    Kafka Connect プラグインの詳細については、Confluent のこちらのガイドを参照してください。

  3. writemodel.strategs構成設定でカスタム クラスを指定します。

クラスを JAR ファイルにコンパイルする方法については、 JAR 配置ガイド を参照してください。 Java SE ドキュメントに含まれるクエリ操作の例を取りあげます。

戻る

Fundamentals