Especificar um esquema
Este exemplo de uso demonstra como você pode configurar o conector de origem do MongoDB Kafka para aplicar um esquema personalizado aos seus dados. Um esquema é uma definição que especifica a estrutura e digita informações sobre dados em um tópico do Apache Kafka. Use um esquema quando precisar garantir que os dados sobre o tópico preenchidos pelo conector de origem tenham uma estrutura consistente.
Para saber mais sobre como usar esquemas com o conector, consulte o guia Aplicar esquemas .
Exemplo
Suponha que seu aplicação mantenha o controle dos dados do cliente em uma coleção MongoDB e você deseja publicar esses dados em um tópico do Kafka. Você deseja que os assinantes dos dados do cliente recebam dados formatados consistentemente. Você escolhe aplicar um esquema aos seus dados.
Suas necessidades e suas soluções são as seguintes:
Requerimento | várias plataformas |
---|---|
Receber dados de clientes de uma coleção MongoDB | Configure a MongoDB source connector to receive updates to data
from a specific database and collection. |
Fornecer o esquema de dados do cliente | Specify a schema that corresponds to the structure and data types of
the customer data. |
Omitir metadados do Kafka dos dados do cliente | Include only the data from the fullDocument field. |
Para o arquivo de configuração completo que atende aos requisitos acima, consulte Especificar a configuração.
Receber dados de uma coleção
Para configurar seu conector de origem para receber dados de uma coleção MongoDB, especifique o nome do banco de dados e da coleção. Para este exemplo, você pode configurar o conector para ler a partir da coleção purchases
no banco de dados customers
como segue:
database=customers collection=purchases
Criar um esquema personalizado
Um exemplo de documento de dados do cliente da sua coleção contém as seguintes informações:
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
A partir do documento de amostra, você decide que seu esquema deve apresentar os campos usando os seguintes tipos de dados:
Nome do campo | Tipos de dados | Descrição |
---|---|---|
name | Name of the customer | |
visitas | array of timestamps | Datas em que o cliente visitou |
bens_comprados | Nomes de mercadorias e quantidade de cada item que o cliente comprou |
Você pode descrever seus dados usando o formato de esquema Apache Avro, como mostrado no esquema de exemplo abaixo:
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
Importante
Conversores
Para enviar seus dados com codificação binário Avro através do Apache Kafka, é necessário utilizar um conversor Avro. Para obter mais informações, consulte o guia sobre Conversores.
Omitir metadados de registros publicados
O conector publica os documentos e metadados de dados do cliente que descrevem o documento em um tópico do Kafka. Você pode definir o conector para incluir somente os dados do documento contidos no campo fullDocument
do registro usando a seguinte configuração:
publish.full.document.only=true
Para obter mais informações sobre o campo fullDocument
, consulte o guia Change Streams .
Especifique a configuração
A configuração do conector do esquema personalizado deve se assemelhar ao seguinte:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
Observação
Esquema incorporado
Na configuração anterior, o conversor de JSON schema do Kafka Connect incorpora o esquema personalizado em suas mensagens. Para saber mais sobre o conversor de JSON schema, consulte o guia Conversores.
Para mais informações sobre como especificar esquemas, consulte o guia Aplicar Esquemas.