Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

変換子

項目一覧

  • Overview
  • 使用可能な変換子
  • スキーマを使用した変換もの
  • Connector構成
  • Avro Converter
  • Protobuf Converter
  • JSON schema 変換
  • JSON 変換
  • string変換子(Raw JSON )

このガイドでは、 で 変換 を使用するMongoDBKafka Connector 方法について説明します。変換子は、バイトと Kafka Connect のランタイム データ形式の間で変換されるプログラムです。

変換ツールは Kafka Connect と Apache Kafka 間でデータを渡します。 connectorは MongoDB と Kafka Connect 間でデータを渡します。 次の図は、これらの関係を示しています。

Kafka Connect における変換者の役割を示す図

変換者の詳細については、次のリソースを参照してください。

  • Confluent の記事。

  • Kafka Connect の概念に関する Confluent の記事

  • 変換用インターフェース API ドキュメント

コネクタが MongoDB データを Kafka Connect のランタイム データ形式に変換するため、コネクタは利用可能なすべてのコネクタで動作します。

重要

ソース コネクタと Sink コネクタに同じ変換パラメータを使用

MongoDB Kafka ソース コネクタと MongoDB Kafka シンク コネクタでは、同じコネクタを使用する必要があります。 たとえば、ソース コネクタが Protobuf を使用してトピックに書込む場合、Sink Connector は Protobuf を使用してトピックから読み取りを行う必要があります。

使用する ドライバーについては、 Confluent のこちらのページ を参照してください。

KafkaKafkaKafkaJSON schema

スキーマを指定する方法については、 スキーマの適用ガイドをご覧ください。

このセクションでは、コネクタ パイプラインで次の変換を構成するためのプロパティ ファイルのテンプレートを提供します。

Atlas 変換 で動作するプロパティ ファイルを表示するには、次のタブをクリックします。

以下のプロパティ ファイルは、ソース コネクタを定義します。 この connector は、デフォルトのスキーマと Avro 変換を使用して Apache Kafka トピックに書き込みます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

重要

MongoDB データソースを使用した AES(A個)

Atlas フィルターは静的構造を持つデータには最適ですが、動的データや変化データには適していません。 MongoDBのスキーマレスdocument modelは動的データをサポートしているため、Atlas 変換を指定する前にMongoDBデータソースが静的構造であることを確認してください。

以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、Avro 変換を使用して Apache Kafka トピックから読み取りを行います。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。

Protobuf 変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。

以下のプロパティ ファイルは、ソース コネクタを定義します。 このコネクターは、デフォルトのスキーマと Protobuf 変換を使用して Apache Kafka トピックに書込みます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、Protobuf 変換を使用して Apache Kafka トピックから読み取りを行います。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。

JSON schema 変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。

以下のプロパティ ファイルは、Confluent schema Registry を使用して JSON schema を管理するように connector を構成します。

以下のプロパティ ファイルは、ソース コネクタを定義します。 このコネクタは、デフォルト スキーマと JSON schema 変換を使用して Apache Kafka トピックに書込みます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、JSON schema 変換を使用して Apache Kafka トピックから読み取りを行います。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

以下のプロパティ ファイルは、メッセージに JSON スキーマを埋め込むようにコネクターを構成します。

重要

メッセージ サイズ増加

メッセージに JSON schema を埋め込むと、メッセージのサイズが増大します。 JSON schema を使用中にメッセージのサイズを縮小するには、 schema Registry を使用します。

以下のプロパティ ファイルは、ソース コネクタを定義します。 このコネクタは、デフォルト スキーマと JSON schema 変換を使用して Apache Kafka トピックに書込みます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
output.schema.infer.value=true
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、JSON schema 変換を使用して Apache Kafka トピックから読み取りを行います。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。

JSON 変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。

以下のプロパティ ファイルは、ソース コネクタを定義します。 この connector は、JSON 変換を使用して Apache Kafka トピックに書き込みます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、JSON 変換を使用して Apache Kafka トピックから読み取りを行います。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。

string変換で動作するプロパティ ファイルを表示するには、次のタブをクリックします。

以下のプロパティ ファイルは、ソース コネクタを定義します。 この connector は、 string変換を使用してApache Kafkaトピックに書き込みます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

以下のプロパティ ファイルは、Sink Connector を定義します。 このコネクターは、 string変換を使用してApache Kafkaトピックから読み取りを行います。

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

重要

受信した文字列は有効な JSON である必要があります

Sink connector は、string 変換を使用する場合でも、Apache Kafka トピックから有効な JSON string を受信する必要があります。

上記のプロパティ ファイルを使用するには、角括弧 内のプレースホルダー テキストを 情報に置き換えます。

戻る

Data Formats