Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/ /

Especificar um esquema

Este exemplo de uso demonstra como você pode configurar o conector de origem do MongoDB Kafka para aplicar um esquema personalizado aos seus dados. Um esquema é uma definição que especifica a estrutura e digita informações sobre dados em um tópico do Apache Kafka. Use um esquema quando precisar garantir que os dados sobre o tópico preenchidos pelo conector de origem tenham uma estrutura consistente.

Para saber mais sobre como usar esquemas com o conector, consulte o guia Aplicar esquemas .

Suponha que seu aplicativo mantenha o controle dos dados do cliente em uma coleção MongoDB e você precisa publicar esses dados em um tópico do Kafka. Você deseja que os assinantes dos dados do cliente recebam dados formatados consistentemente. Você escolhe aplicar um esquema aos seus dados.

Suas necessidades e suas soluções são as seguintes:

Requerimento
várias plataformas
Receber dados de clientes de uma coleção MongoDB
Configure a MongoDB source connector to receive updates to data from a specific database and collection.
See Receive Data from a Collection.
Fornecer o esquema de dados do cliente
Specify a schema that corresponds to the structure and data types of the customer data.
Omitir metadados do Kafka dos dados do cliente
Include only the data from the fullDocument field.

Para o arquivo de configuração completo que atende aos requisitos acima, consulte Especificar a configuração.

Para configurar seu conector de origem para receber dados de uma coleção MongoDB, especifique o nome do banco de dados e da coleção. Para este exemplo, você pode configurar o conector para ler a partir da coleção purchases no banco de dados customers como segue:

database=customers
collection=purchases

Um exemplo de documento de dados do cliente da sua coleção contém as seguintes informações:

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

A partir do documento de amostra, você decide que seu esquema deve apresentar os campos usando os seguintes tipos de dados:

Nome do campo
Tipos de dados
Descrição
name
Name of the customer
visitas
Datas em que o cliente visitou
bens_comprados
mapear de string (o tipo assumido) para valores inteiros
Nomes de mercadorias e quantidade de cada item que o cliente comprou

Você pode descrever seus dados usando o formato de esquema Apache Avro, como mostrado no esquema de exemplo abaixo:

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

Importante

Conversores

Para enviar seus dados com codificação binário Avro através do Apache Kafka, é necessário utilizar um conversor Avro. Para obter mais informações, consulte o guia sobre Conversores.

O conector publica os documentos e metadados de dados do cliente que descrevem o documento em um tópico do Kafka. Você pode definir o conector para incluir somente os dados do documento contidos no campo fullDocument do registro usando a seguinte configuração:

publish.full.document.only=true

Para obter mais informações sobre o campo fullDocument , consulte o guia Change Streams .

A configuração do conector do esquema personalizado deve se assemelhar ao seguinte:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

Observação

Esquema incorporado

Na configuração anterior, o conversor de JSON schema do Kafka Connect incorpora o esquema personalizado em suas mensagens. Para saber mais sobre o conversor de JSON schema, consulte o guia Conversores.

Para mais informações sobre como especificar esquemas, consulte o guia Aplicar Esquemas.

Voltar

Copiar dados existentes