Replicar dados com um manipulador de captura de dados de alteração
Nesta página
- Visão geral
- Replicar dados com um controlador de CDC
- Concluir a configuração do tutorial
- Iniciar conchas interativas
- Configurar o conector de origem
- Configurar o conector do coletor
- Monitorar o tópico do Kafka
- Gravar dados na fonte e observar o fluxo de dados
- (Opcional) Gerar alterações adicionais
- Resumo
- Saiba mais
Visão geral
Siga este tutorial para saber como usar um manipulador de captura de dados de alteração (CDC) para replicar dados com o Conector Kafka do MongoDB. Um manipulador de CDC é um aplicativo que traduz eventos de CDC em operações de gravação MongoDB. Use um manipulador CDC quando precisar reproduzir as alterações em um datastore em outro datastore.
Neste tutorial, você configura e executa conectores de origem e coletor do MongoDB Kafka para fazer com que duas coleções do MongoDB contenham os mesmos documentos usando CDC. O conector de origem grava os dados de fluxo de alteração da coleção original em um tópico do Kafka e o conector do coletor grava os dados do tópico do Kafka na coleção do MongoDB de destino.
Se você quiser saber mais sobre como funcionam os manipuladores de CDC, consulte o guia Alterar manipuladores de captura de dados .
Replicar dados com um controlador de CDC
Concluir a configuração do tutorial
Conclua as etapas da Configuração do tutorial do Conector Kafka para iniciar o Kafka Connect da Confluent e o ambiente do MongoDB.
Iniciar conchas interativas
Inicie duas conchas interativas no recipiente Docker em janelas separadas. No tutorial, você pode usar as conchas para executar e observar tarefas diferentes.
Execute o seguinte comando a partir de um terminal para iniciar uma shell interativa.
docker exec -it mongo1 /bin/bash
Iremos referir-se a este shell interactivo como CDCShell1 durante este tutorial.
Execute o seguinte comando em um segundo terminal para iniciar uma shell interativa:
docker exec -it mongo1 /bin/bash
Iremos referir-se a este shell interactivo como CDCShell2 durante este tutorial.
Organize as duas janelas na tela para mantê-las visíveis e ver as atualizações em tempo real.
Utilize o CDCShell1 para configurar seus conectores e monitorar seu tópico do Kafka. Use CDCShell2 para executar operações de gravação no MongoDB.
Configurar o conector de origem
Em CDCShell1, configure um conector de origem para ler a partir do namespace MongoDB do CDCTutorial.Source
e escrever no tópico Kafka do CDCTutorial.Source
.
Crie um arquivo de configuração denominado cdc-source.json
utilizando o seguinte comando:
nano cdc-source.json
Cole as seguintes informações de configuração no arquivo e salve suas alterações:
{ "name": "mongo-cdc-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Source" } }
Execute o seguinte comando em CDCShell1 para iniciar o conector de origem utilizando o arquivo de configuração que você criou:
cx cdc-source.json
Observação
O comando cx
é um roteiro personalizado incluído no ambiente de desenvolvimento do tutorial. Este roteiro executa a seguinte solicitação equivalente à API REST do Kafka Connect para criar um novo conector:
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
Execute o seguinte comando na shell para verificar o status dos conectores:
status
Se o conector de origem foi iniciado com sucesso, você deverá ver a seguinte saída:
Kafka topics: ... The status of the connectors: source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...
Configurar o conector do coletor
Em CDCShell1, configure um conector de pia para copiar dados do tópico Kafka do CDCTutorial.Source
para o namespace MongoDB do CDCTutorial.Destination
.
Crie um arquivo de configuração denominado cdc-sink.json
utilizando o seguinte comando:
nano cdc-sink.json
Cole as seguintes informações de configuração no arquivo e salve suas alterações:
{ "name": "mongo-cdc-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Destination" } }
Execute o seguinte comando no shell para iniciar o conector do coletor usando o arquivo de configuração criado:
cx cdc-sink.json
Execute o seguinte comando na shell para verificar o status dos conectores:
status
Se o conector do coletor tiver sido iniciado com sucesso, você verá a seguinte saída:
Kafka topics: ... The status of the connectors: sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ...
Monitorar o tópico do Kafka
Em CDCShell1, monitore o tópico Kafka para eventos recebidos. Execute o seguinte comando para iniciar o aplicativo kafkacat
que gera resultados de dados publicados no tópico:
kc CDCTutorial.Source
Observação
O comando kc
é um script personalizado incluído no ambiente de desenvolvimento do tutorial que chama o aplicativo kafkacat
com opções para conectar ao Kafka e formatar a saída do tópico especificado.
Depois de iniciado, você verá a seguinte saída que indica que atualmente não há dados para ler:
% Reached end of topic CDCTutorial.Source [0] at offset 0
Gravar dados na fonte e observar o fluxo de dados
Em CDCShell2, conecte ao MongoDB utilizando o mongosh
, o MongoDB shell executando o seguinte comando:
mongosh "mongodb://mongo1"
Depois de se conectar com sucesso, você deverá ver o seguinte prompt do shell do MongoDB:
rs0 [direct: primary] test>
No prompt, digite os seguintes comandos para inserir um novo documento no namespace MongoDB CDCTutorial.Source
:
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });
Quando o MongoDB concluir o comando de inserção, você deverá receber uma confirmação que se assemelha ao texto a seguir:
{ acknowledged: true, insertedId: ObjectId("600b38ad...") }
O conector de origem pega a alteração e a publica no tópico Kafka. Você deve visualizar a seguinte mensagem de tópico em sua janela CDCShell1:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8260..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } }, "wallTime": { "$date": "..." }, "fullDocument": { "_id": { "$oid": "600b38ad..." }, "proclaim": "Hello World!" }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38a..." } } } }
O conector de coletor capta a mensagem Kafka e armazena os dados no MongoDB. Você pode recuperar o documento do namespace CDCTutorial.Destination
no MongoDB executando o seguinte comando no MongoDB Shell iniciado no CDCShell2:
db.Destination.find()
O resultado deve ver o seguinte documento retornado:
[ { _id: ObjectId("600b38a..."), proclaim: 'Hello World' } ]
(Opcional) Gerar alterações adicionais
Tente remover documentos do namespace CDCTutorial.Source
executando o seguinte comando no shell do MongoDB:
db.Source.deleteMany({})
Você deve visualizar a seguinte mensagem de tópico em sua janela CDCShell1:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8261...." }, ... "operationType": "delete", "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "6138..." } } } }
Execute o seguinte comando para recuperar o número atual de documentos na coleção:
db.Destination.count()
Isso retorna a seguinte saída, indicando que a coleção está vazia:
0
Execute o seguinte comando para sair da shell MongoDB:
exit
Resumo
Neste tutorial, você configura um conector de origem para capturar alterações em uma coleção MongoDB e enviá-los para o Apache Kafka. Você também configurou um conector de pia com um Controlador de CDC MongoDB para mover os dados do Apache Kafka para uma coleção MongoDB.
Saiba mais
Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial: