Copiar dados existentes
Este exemplo de uso demonstra como copiar dados de uma coleção MongoDB para um tópico Apache Kafka usando o conector de origem MongoDB Kafka.
Exemplo
Suponha que você precise copiar uma coleção MongoDB para o Apache Kafka e filtrar alguns dos dados.
Suas necessidades e suas soluções são as seguintes:
Requerimento | várias plataformas |
---|---|
Copie a coleção customers do banco de dados do shopping em seu sistema MongoDB em um tópico do Apache Kafka. | See the Copy Data section of this guide. |
Copie apenas documentos que tenham o valor "México" no campo country . | See the Filter Data section of this guide. |
A coleção customers
contém os seguintes documentos:
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
Copiar dados
Copie o conteúdo da coleção customers
do banco de dados shopping
especificando as seguintes opções de configuração em seu conector de origem:
database=shopping collection=customers startup.mode=copy_existing
Seu conector de origem copia sua coleção criando documentos de eventos de alteração que descrevem a inserção de cada documento em sua collection.
Observação
A cópia de dados pode produzir eventos duplicados
Se algum sistema alterar os dados no banco de dados enquanto o conector de origem converte dados existentes dele, o MongoDB poderá produzir eventos de fluxo de alterações duplicados para refletir as alterações mais recentes. Como os eventos de fluxo de alterações nos quais a cópia de dados depende são idempotentes, os dados copiados são eventualmente consistentes.
Para saber mais sobre como alterar documentos de evento , consulte o guia Change Streams .
Para saber mais sobre a opção startup.mode
, consulte Propriedades de inicialização.
Filtre dados
Você pode filtrar dados especificando um pipeline de agregação na opção startup.mode.copy.existing.pipeline
da configuração do conector de origem. A seguinte configuração especifica um pipeline de agregação que corresponde a todos os documentos com "México" no campo country
:
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
Para saber mais sobre a opção startup.mode.copy.existing.pipeline
, consulte Propriedades de inicialização.
Para saber mais sobre pipelines de agregação, consulte os seguintes recursos:
Exemplo de uso: Personalizar um pipeline para filtrar eventos de alteração
Agregação no Manual do MongoDB
Especifique a configuração
A configuração do conector de origem final para copiar a coleção customers
deve ser assim:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
Depois que o conector copiar os dados, você verá o seguinte documento de evento de alteração correspondente à coleção de amostra anterior no tópico Apache Kafka shopping.customers
:
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
Observação
Escreva os dados em seu Tópico em uma Coleção
Use um manipulador de captura de dados de alteração para converter documentos de eventos de alteração em um tópico do Apache Kafka em operações de gravação do MongoDB. Para saber mais, consulte o guia Alterar manipuladores de captura de dados.