Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/ /

Copiar dados existentes

Este exemplo de uso demonstra como copiar dados de uma coleção MongoDB para um tópico Apache Kafka usando o conector de origem MongoDB Kafka.

Suponha que você precise copiar uma coleção MongoDB para o Apache Kafka e filtrar alguns dos dados.

Suas necessidades e suas soluções são as seguintes:

Requerimento
várias plataformas
Copie a coleção customers do banco de dados do shopping em seu sistema MongoDB em um tópico do Apache Kafka.
See the Copy Data section of this guide.
Copie apenas documentos que tenham o valor "México" no campo country.
See the Filter Data section of this guide.

A coleção customers contém os seguintes documentos:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

Copie o conteúdo da coleção customers do banco de dados shopping especificando as seguintes opções de configuração em seu conector de origem:

database=shopping
collection=customers
startup.mode=copy_existing

Seu conector de origem copia sua coleção criando documentos de eventos de alteração que descrevem a inserção de cada documento em sua collection.

Observação

A cópia de dados pode produzir eventos duplicados

Se algum sistema alterar os dados no banco de dados enquanto o conector de origem converte dados existentes dele, o MongoDB poderá produzir eventos de fluxo de alterações duplicados para refletir as alterações mais recentes. Como os eventos de fluxo de alterações nos quais a cópia de dados depende são idempotentes, os dados copiados são eventualmente consistentes.

Para saber mais sobre como alterar documentos de evento , consulte o guia Change Streams .

Para saber mais sobre a opção startup.mode, consulte Propriedades de inicialização.

Você pode filtrar dados especificando um pipeline de agregação na opção startup.mode.copy.existing.pipeline da configuração do conector de origem. A seguinte configuração especifica um pipeline de agregação que corresponde a todos os documentos com "México" no campo country:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Para saber mais sobre a opção startup.mode.copy.existing.pipeline, consulte Propriedades de inicialização.

Para saber mais sobre pipelines de agregação, consulte os seguintes recursos:

A configuração do conector de origem final para copiar a coleção customers deve ser assim:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Depois que o conector copiar os dados, você verá o seguinte documento de evento de alteração correspondente à coleção de amostra anterior no tópico Apache Kafka shopping.customers:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

Observação

Escreva os dados em seu Tópico em uma Coleção

Use um manipulador de captura de dados de alteração para converter documentos de eventos de alteração em um tópico do Apache Kafka em operações de gravação do MongoDB. Para saber mais, consulte o guia Alterar manipuladores de captura de dados.

Voltar

Nomenclatura de tópicos