Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

指定模式

此用法示例演示了如何配置MongoDB Kafka源Connector以应用自定义模式应用于数据。 模式是指定Apache Kafka主题中数据的结构和类型信息的定义。 当您必须确保源Connector填充的主题数据具有一致的结构时,请使用模式。

要学习;了解有关通过Connector使用模式的更多信息,请参阅应用模式指南。

假设您的应用程序追踪MongoDB集合中的客户数据,并且您想将此数据发布到Kafka主题。 您希望客户数据的订阅者收到格式一致的数据。 您可以选择对数据应用模式。

要求和解决方案如下:

要求
解决方案
从 MongoDB 集合接收客户数据
Configure a MongoDB source connector to receive updates to data from a specific database and collection.
See Receive Data from a Collection.
提供客户数据模式
Specify a schema that corresponds to the structure and data types of the customer data.
从客户数据中省略 Kafka 元数据
Include only the data from the fullDocument field.

有关满足上述要求的完整配置文件,请参阅指定配置。

要将 Source 连接器配置为从 MongoDB 集合接收数据,请指定数据库和集合名称。对于本示例,您可以将连接器配置为从 customers 数据库中的 purchases 集合读取,如下所示:

database=customers
collection=purchases

集合中的示例客户数据文档包含以下信息:

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

在样本文档中,您决定模式应使用以下数据类型显示字段:

字段名称
数据类型
说明
名称
Name of the customer
visits
客户访问日期
goods_purchased
地图 string(假定类型)转换为 整数
客户购买的商品名称和每件商品的数量

您可以使用 Apache Avro 模式格式描述您的数据,如以下示例模式所示:

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

重要

转换器

如果您想使用 Avro 二进制编码通过 Apache Kafka 发送数据,则必须使用 Avro 转换器。有关更多信息,请参阅转换器指南。

连接器将客户数据文档和描述该文档的元数据发布到 Kafka 主题。您可以使用以下设置,将连接器设置为仅纳入该记录的 fullDocument 字段中包含的文档数据:

publish.full.document.only=true

有关 fullDocument 字段的更多信息,请参阅变更流指南。

自定义模式连接器配置应如下所示:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

注意

嵌入式模式

在上述配置中,Kafka Connect JSON 模式转换器将自定义模式嵌入消息中。要了解有关 JSON 模式转换器的更多信息,请参阅转换器指南。

有关指定模式的更多信息,请参阅应用模式指南。

后退

复制现有数据