Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/

Replicar dados com um manipulador de captura de dados de alteração

Nesta página

  • Visão geral
  • Replicar dados com um controlador de CDC
  • Concluir a configuração do tutorial
  • Iniciar conchas interativas
  • Configurar o conector de origem
  • Configurar o conector do coletor
  • Monitorar o tópico do Kafka
  • Gravar dados na fonte e observar o fluxo de dados
  • (Opcional) Gerar alterações adicionais
  • Resumo
  • Saiba mais

Siga este tutorial para saber como usar um manipulador de captura de dados de alteração (CDC) para replicar dados com o Conector Kafka do MongoDB. Um manipulador de CDC é um aplicativo que traduz eventos de CDC em operações de gravação MongoDB. Use um manipulador CDC quando precisar reproduzir as alterações em um datastore em outro datastore.

Neste tutorial, você configura e executa conectores de origem e coletor do MongoDB Kafka para fazer com que duas coleções do MongoDB contenham os mesmos documentos usando CDC. O conector de origem grava os dados de fluxo de alteração da coleção original em um tópico do Kafka e o conector do coletor grava os dados do tópico do Kafka na coleção do MongoDB de destino.

Se você quiser saber mais sobre como funcionam os manipuladores de CDC, consulte o guia Alterar manipuladores de captura de dados .

1

Conclua as etapas da Configuração do tutorial do Conector Kafka para iniciar o Kafka Connect da Confluent e o ambiente do MongoDB.

2

Inicie duas conchas interativas no recipiente Docker em janelas separadas. No tutorial, você pode usar as conchas para executar e observar tarefas diferentes.

Execute o seguinte comando a partir de um terminal para iniciar uma shell interativa.

docker exec -it mongo1 /bin/bash

Iremos referir-se a este shell interactivo como CDCShell1 durante este tutorial.

Execute o seguinte comando em um segundo terminal para iniciar uma shell interativa:

docker exec -it mongo1 /bin/bash

Iremos referir-se a este shell interactivo como CDCShell2 durante este tutorial.

Organize as duas janelas na tela para mantê-las visíveis e ver as atualizações em tempo real.

Utilize o CDCShell1 para configurar seus conectores e monitorar seu tópico do Kafka. Use CDCShell2 para executar operações de gravação no MongoDB.

3

Em CDCShell1, configure um conector de origem para ler a partir do namespace MongoDB do CDCTutorial.Source e escrever no tópico Kafka do CDCTutorial.Source.

Crie um arquivo de configuração denominado cdc-source.json utilizando o seguinte comando:

nano cdc-source.json

Cole as seguintes informações de configuração no arquivo e salve suas alterações:

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

Execute o seguinte comando em CDCShell1 para iniciar o conector de origem utilizando o arquivo de configuração que você criou:

cx cdc-source.json

Observação

O comando cx é um roteiro personalizado incluído no ambiente de desenvolvimento do tutorial. Este roteiro executa a seguinte solicitação equivalente à API REST do Kafka Connect para criar um novo conector:

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

Execute o seguinte comando na shell para verificar o status dos conectores:

status

Se o conector de origem foi iniciado com sucesso, você deverá ver a seguinte saída:

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

Em CDCShell1, configure um conector de pia para copiar dados do tópico Kafka do CDCTutorial.Source para o namespace MongoDB do CDCTutorial.Destination.

Crie um arquivo de configuração denominado cdc-sink.json utilizando o seguinte comando:

nano cdc-sink.json

Cole as seguintes informações de configuração no arquivo e salve suas alterações:

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

Execute o seguinte comando no shell para iniciar o conector do coletor usando o arquivo de configuração criado:

cx cdc-sink.json

Execute o seguinte comando na shell para verificar o status dos conectores:

status

Se o conector do coletor tiver sido iniciado com sucesso, você verá a seguinte saída:

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

Em CDCShell1, monitore o tópico Kafka para eventos recebidos. Execute o seguinte comando para iniciar o aplicativo kafkacat que gera resultados de dados publicados no tópico:

kc CDCTutorial.Source

Observação

O comando kc é um script personalizado incluído no ambiente de desenvolvimento do tutorial que chama o aplicativo kafkacat com opções para conectar ao Kafka e formatar a saída do tópico especificado.

Depois de iniciado, você verá a seguinte saída que indica que atualmente não há dados para ler:

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

Em CDCShell2, conecte ao MongoDB utilizando o mongosh, o MongoDB shell executando o seguinte comando:

mongosh "mongodb://mongo1"

Depois de se conectar com sucesso, você deverá ver o seguinte prompt do shell do MongoDB:

rs0 [direct: primary] test>

No prompt, digite os seguintes comandos para inserir um novo documento no namespace MongoDB CDCTutorial.Source:

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

Quando o MongoDB concluir o comando de inserção, você deverá receber uma confirmação que se assemelha ao texto a seguir:

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

O conector de origem pega a alteração e a publica no tópico Kafka. Você deve visualizar a seguinte mensagem de tópico em sua janela CDCShell1:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

O conector de coletor capta a mensagem Kafka e armazena os dados no MongoDB. Você pode recuperar o documento do namespace CDCTutorial.Destination no MongoDB executando o seguinte comando no MongoDB Shell iniciado no CDCShell2:

db.Destination.find()

O resultado deve ver o seguinte documento retornado:

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

Tente remover documentos do namespace CDCTutorial.Source executando o seguinte comando no shell do MongoDB:

db.Source.deleteMany({})

Você deve visualizar a seguinte mensagem de tópico em sua janela CDCShell1:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

Execute o seguinte comando para recuperar o número atual de documentos na coleção:

db.Destination.count()

Isso retorna a seguinte saída, indicando que a coleção está vazia:

0

Execute o seguinte comando para sair da shell MongoDB:

exit

Neste tutorial, você configura um conector de origem para capturar alterações em uma coleção MongoDB e enviá-los para o Apache Kafka. Você também configurou um conector de pia com um Controlador de CDC MongoDB para mover os dados do Apache Kafka para uma coleção MongoDB.

Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial:

Voltar

Começando com o conector do coletor MongoDB Kafka