转换器
Overview
本指南介绍如何在 MongoDB Kafka Connector 中使用转换器。转换器是在字节和 Kafka Connect 运行时数据格式之间进行转换的程序。
转换器在 Kafka Connect 和 Apache Kafka 之间传递数据。该连接器在 MongoDB 和 Kafka Connect 之间传递数据。下图显示了这些关系:
要了解有关转换器的更多信息,请参阅以下资源:
可用转换器
当连接器将 MongoDB 数据转换为 Kafka Connect 的运行时数据格式时,连接器可与所有可用的转换器配合使用。
重要
对源连接器和接收器连接器使用相同的转换器
您必须在 MongoDB Kafka 源连接器和 MongoDB Kafka 目标连接器中使用相同的转换器。例如,如果源连接器使用 Protobuf 向主题写入,那么目标连接器就必须使用 Protobuf 从主题读取。
要了解要使用的转换器, 请参阅 Confluence 的此页面。
带模式的转换器
如果使用基于架构的转换器(例如 Kafka Connect Avro Converter (Avro Converter)、Kafka Connect Protobuf Converter 或 Kafka Connect JSON Schema Converter),您应该在源连接器中定义架构。
要学习;了解如何指定模式,请参阅应用模式指南。
连接器配置
本节提供了属性文件模板,用于在连接器管道中配置以下转换器:
Avro Converter
单击以下选项卡以查看与 Avro Converter 一起使用的属性文件:
以下属性文件定义源连接器。此连接器使用默认模式和 Avro 转换器写入 Apache Kafka 主题:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
重要
带有 MongoDB 数据源的 Avro Converter
Avro 转换器非常适合采用静态结构的数据,但不太适合动态或不断变化的数据。MongoDB 的无模式文档模型支持动态数据,因此在指定 Avro 转换器之前,请确保您的 MongoDB 数据源采用静态结构。
以下属性文件定义了一个 SinkConnector。 此Connector使用 Avro 转换器从Apache Kafka主题读取:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。
Protobuf Converter
单击以下标签页可查看与 Protobuf 转换器配合使用的属性文件:
以下属性文件定义源连接器。此连接器使用默认模式和 Protobuf 转换器写入 Apache Kafka 主题:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
以下属性文件定义了一个 SinkConnector。 此Connector使用 Protobuf 转换器从Apache Kafka主题读取:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。
JSON 架构转换器
单击以下选项卡以查看与 JSON Schema Converter 一起使用的属性文件:
以下属性文件将您的连接器配置为使用 Confluence 模式注册表管理 JSON 模式:
以下属性文件定义源连接器。此连接器使用默认模式和 JSON 模式转换器写入 Apache Kafka 主题:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
以下属性文件定义了一个 SinkConnector。 此Connector使用JSON schema 转换器从Apache Kafka主题读取:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
以下属性文件将Connector配置为在消息中嵌入JSON schema:
重要
增加消息大小
在消息中嵌入JSON schema 会增加消息的大小。 要在使用JSON schema 时减小消息的大小,请使用模式注册表。
以下属性文件定义源连接器。此连接器使用默认模式和 JSON 模式转换器写入 Apache Kafka 主题:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema output.schema.infer.value=true key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
以下属性文件定义了一个 SinkConnector。 此Connector使用JSON schema 转换器从Apache Kafka主题读取:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。
JSON 转换器
单击以下选项卡,查看与 JSON 转换器搭配使用的属性文件:
以下属性文件定义源连接器。此连接器使用 JSON 转换器写入 Apache Kafka 主题:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
以下属性文件定义了一个 SinkConnector。 此Connector使用JSON转换器从Apache Kafka主题读取:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。
字符串转换器(原始 JSON)
单击以下标签页可查看与字符串转换器配合使用的属性文件:
以下属性文件定义源连接器。此连接器使用 String 转换器写入 Apache Kafka 主题:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
以下属性文件定义了一个 SinkConnector。 此Connector使用string转换器从Apache Kafka主题读取:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
重要
收到的字符串必须是有效的JSON
即使使用 转换器,接收器Connector也必须从 主题接收有效的JSON ApacheKafkastring字符串。
要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。