指定模式
此用法示例演示了如何配置MongoDB Kafka源Connector以应用自定义模式应用于数据。 模式是指定Apache Kafka主题中数据的结构和类型信息的定义。 当您必须确保源Connector填充的主题数据具有一致的结构时,请使用模式。
要学习;了解有关通过Connector使用模式的更多信息,请参阅应用模式指南。
例子
假设您的应用程序追踪MongoDB集合中的客户数据,并且您想将此数据发布到Kafka主题。 您希望客户数据的订阅者收到格式一致的数据。 您可以选择对数据应用模式。
要求和解决方案如下:
要求 | 解决方案 |
---|---|
从 MongoDB 集合接收客户数据 | Configure a MongoDB source connector to receive updates to data
from a specific database and collection. |
提供客户数据模式 | Specify a schema that corresponds to the structure and data types of
the customer data. |
从客户数据中省略 Kafka 元数据 | Include only the data from the fullDocument field. |
有关满足上述要求的完整配置文件,请参阅指定配置。
从集合中接收数据
要将 Source 连接器配置为从 MongoDB 集合接收数据,请指定数据库和集合名称。对于本示例,您可以将连接器配置为从 customers
数据库中的 purchases
集合读取,如下所示:
database=customers collection=purchases
创建自定义模式
集合中的示例客户数据文档包含以下信息:
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
在样本文档中,您决定模式应使用以下数据类型显示字段:
字段名称 | 数据类型 | 说明 |
---|---|---|
名称 | Name of the customer | |
visits | 客户访问日期 | |
goods_purchased | 客户购买的商品名称和每件商品的数量 |
您可以使用 Apache Avro 模式格式描述您的数据,如以下示例模式所示:
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
从已发布记录中省略元数据
连接器将客户数据文档和描述该文档的元数据发布到 Kafka 主题。您可以使用以下设置,将连接器设置为仅纳入该记录的 fullDocument
字段中包含的文档数据:
publish.full.document.only=true
有关 fullDocument
字段的更多信息,请参阅变更流指南。
指定配置
自定义模式连接器配置应如下所示:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
有关指定模式的更多信息,请参阅应用模式指南。