変換子
項目一覧
Overview
このガイドでは、 で 変換 を使用するMongoDBKafka Connector 方法について説明します。変換子は、バイトと Kafka Connect のランタイム データ形式の間で変換されるプログラムです。
変換ツールは Kafka Connect と Apache Kafka 間でデータを渡します。 connectorは MongoDB と Kafka Connect 間でデータを渡します。 次の図は、これらの関係を示しています。
変換者の詳細については、次のリソースを参照してください。
使用可能な変換子
コネクタが MongoDB データを Kafka Connect のランタイム データ形式に変換するため、コネクタは利用可能なすべてのコネクタで動作します。
重要
ソース コネクタと Sink コネクタに同じ変換パラメータを使用
MongoDB Kafka ソース コネクタと MongoDB Kafka シンク コネクタでは、同じコネクタを使用する必要があります。 たとえば、ソース コネクタが Protobuf を使用してトピックに書込む場合、Sink Connector は Protobuf を使用してトピックから読み取りを行う必要があります。
使用する ドライバーについては、 Confluent のこちらのページ を参照してください。
スキーマを使用した変換もの
KafkaKafkaKafkaJSON schema
スキーマを指定する方法については、 スキーマの適用ガイドをご覧ください。
Connector構成
このセクションでは、コネクタ パイプラインで次の変換を構成するためのプロパティ ファイルのテンプレートを提供します。
Avro Converter
Atlas 変換 で動作するプロパティ ファイルを表示するには、次のタブをクリックします。
以下のプロパティ ファイルは、ソース コネクタを定義します。 この connector は、デフォルトのスキーマと Avro 変換を使用して Apache Kafka トピックに書き込みます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
重要
MongoDB データソースを使用した AES(A個)
Atlas フィルターは静的構造を持つデータには最適ですが、動的データや変化データには適していません。 MongoDBのスキーマレスdocument modelは動的データをサポートしているため、Atlas 変換を指定する前にMongoDBデータソースが静的構造であることを確認してください。
以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、Avro 変換を使用して Apache Kafka トピックから読み取りを行います。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。
Protobuf Converter
Protobuf 変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。
以下のプロパティ ファイルは、ソース コネクタを定義します。 このコネクターは、デフォルトのスキーマと Protobuf 変換を使用して Apache Kafka トピックに書込みます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、Protobuf 変換を使用して Apache Kafka トピックから読み取りを行います。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。
JSON schema 変換
JSON schema 変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。
以下のプロパティ ファイルは、Confluent schema Registry を使用して JSON schema を管理するように connector を構成します。
以下のプロパティ ファイルは、ソース コネクタを定義します。 このコネクタは、デフォルト スキーマと JSON schema 変換を使用して Apache Kafka トピックに書込みます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、JSON schema 変換を使用して Apache Kafka トピックから読み取りを行います。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
以下のプロパティ ファイルは、メッセージに JSON スキーマを埋め込むようにコネクターを構成します。
重要
メッセージ サイズ増加
メッセージに JSON schema を埋め込むと、メッセージのサイズが増大します。 JSON schema を使用中にメッセージのサイズを縮小するには、 schema Registry を使用します。
以下のプロパティ ファイルは、ソース コネクタを定義します。 このコネクタは、デフォルト スキーマと JSON schema 変換を使用して Apache Kafka トピックに書込みます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema output.schema.infer.value=true key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、JSON schema 変換を使用して Apache Kafka トピックから読み取りを行います。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。
JSON 変換
JSON 変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。
以下のプロパティ ファイルは、ソース コネクタを定義します。 この connector は、JSON 変換を使用して Apache Kafka トピックに書き込みます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、JSON 変換を使用して Apache Kafka トピックから読み取りを行います。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。
string変換子(Raw JSON )
string変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。
以下のプロパティ ファイルは、ソース コネクタを定義します。 この connector は、 string変換を使用してApache Kafkaトピックに書き込みます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、 string変換を使用してApache Kafkaトピックから読み取りを行います。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
重要
受信した文字列は有効な JSON である必要があります
Sink connector は、string 変換を使用する場合でも、Apache Kafka トピックから有効な JSON string を受信する必要があります。
上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。