Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

转换器

在此页面上

  • Overview
  • 可用转换器
  • 带模式的转换器
  • 连接器配置
  • Avro Converter
  • Protobuf Converter
  • JSON 架构转换器
  • JSON 转换器
  • 字符串转换器(原始 JSON)

本指南介绍如何在 MongoDB Kafka Connector 中使用转换器。转换器是在字节和 Kafka Connect 运行时数据格式之间进行转换的程序。

转换器在 Kafka Connect 和 Apache Kafka 之间传递数据。该连接器在 MongoDB 和 Kafka Connect 之间传递数据。下图显示了这些关系:

该图说明了转换器在 Kafka Connect 中的作用

要了解有关转换器的更多信息,请参阅以下资源:

  • 来自 Confluence 的文章。

  • 关于 Kafka Connect 概念的 Confluence 文章

  • 转换器接口 API 文档

当连接器将 MongoDB 数据转换为 Kafka Connect 的运行时数据格式时,连接器可与所有可用的转换器配合使用。

重要

对源连接器和接收器连接器使用相同的转换器

您必须在 MongoDB Kafka 源连接器和 MongoDB Kafka 目标连接器中使用相同的转换器。例如,如果源连接器使用 Protobuf 向主题写入,那么目标连接器就必须使用 Protobuf 从主题读取。

要了解要使用的转换器, 请参阅 Confluence 的此页面。

如果使用基于架构的转换器(例如 Kafka Connect Avro Converter (Avro Converter)、Kafka Connect Protobuf Converter 或 Kafka Connect JSON Schema Converter),您应该在源连接器中定义架构。

要学习;了解如何指定模式,请参阅应用模式指南。

本节提供了属性文件模板,用于在连接器管道中配置以下转换器:

单击以下选项卡以查看与 Avro Converter 一起使用的属性文件:

以下属性文件定义源连接器。此连接器使用默认模式和 Avro 转换器写入 Apache Kafka 主题:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

重要

带有 MongoDB 数据源的 Avro Converter

Avro 转换器非常适合采用静态结构的数据,但不太适合动态或不断变化的数据。MongoDB 的无模式文档模型支持动态数据,因此在指定 Avro 转换器之前,请确保您的 MongoDB 数据源采用静态结构。

以下属性文件定义了一个 SinkConnector。 此Connector使用 Avro 转换器从Apache Kafka主题读取:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。

单击以下标签页可查看与 Protobuf 转换器配合使用的属性文件:

以下属性文件定义源连接器。此连接器使用默认模式和 Protobuf 转换器写入 Apache Kafka 主题:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

以下属性文件定义了一个 SinkConnector。 此Connector使用 Protobuf 转换器从Apache Kafka主题读取:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。

单击以下选项卡以查看与 JSON Schema Converter 一起使用的属性文件:

以下属性文件将您的连接器配置为使用 Confluence 模式注册表管理 JSON 模式:

以下属性文件定义源连接器。此连接器使用默认模式和 JSON 模式转换器写入 Apache Kafka 主题:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

以下属性文件定义了一个 SinkConnector。 此Connector使用JSON schema 转换器从Apache Kafka主题读取:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

以下属性文件将Connector配置为在消息中嵌入JSON schema:

重要

增加消息大小

在消息中嵌入JSON schema 会增加消息的大小。 要在使用JSON schema 时减小消息的大小,请使用模式注册表。

以下属性文件定义源连接器。此连接器使用默认模式和 JSON 模式转换器写入 Apache Kafka 主题:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
output.schema.infer.value=true
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

以下属性文件定义了一个 SinkConnector。 此Connector使用JSON schema 转换器从Apache Kafka主题读取:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。

单击以下选项卡,查看与 JSON 转换器搭配使用的属性文件:

以下属性文件定义源连接器。此连接器使用 JSON 转换器写入 Apache Kafka 主题:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

以下属性文件定义了一个 SinkConnector。 此Connector使用JSON转换器从Apache Kafka主题读取:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。

单击以下标签页可查看与字符串转换器配合使用的属性文件:

以下属性文件定义源连接器。此连接器使用 String 转换器写入 Apache Kafka 主题:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

以下属性文件定义了一个 SinkConnector。 此Connector使用string转换器从Apache Kafka主题读取:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

重要

收到的字符串必须是有效的JSON

即使使用 转换器,接收器Connector也必须从 主题接收有效的JSON ApacheKafkastring字符串。

要使用前面的属性文件,请将尖括号中的占位符文本替换为您的信息。

后退

Data Formats