Começando com o conector de fonte MongoDB Kafka
Siga este tutorial para saber como configurar um conector de origem MongoDB Kafka para ler dados de um fluxo de alterações e publicá-los em um tópico Apache Kafka.
Começar com o conector de fonte MongoDB Kafka
Concluir a configuração do tutorial
Conclua as etapas daConfiguração do tutorial doKafka Connector para iniciar o Kafka Connect da Confluent e o ambiente do MongoDB .
Configurar o conector de origem
Crie uma sessão de shell interativa no tutorial Docker container baixado para a configuração do tutorial usando o seguinte comando:
docker exec -it mongo1 /bin/bash
Crie um arquivo de configuração de origem denominado simplesource.json
com o seguinte comando:
nano simplesource.json
Cole as seguintes informações de configuração no arquivo e salve suas alterações:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
Execute o seguinte comando na shell para iniciar o conector de origem utilizando o arquivo de configuração que você criou:
cx simplesource.json
Observação
O comando cx
é um roteiro personalizado incluído no ambiente de desenvolvimento do tutorial. Este roteiro executa a seguinte solicitação equivalente à API REST do Kafka Connect para criar um novo conector:
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
Execute o seguinte comando na shell para verificar o status dos conectores:
status
Se o conector de origem foi iniciado com sucesso, você deverá ver a seguinte saída:
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
Criar eventos de alteração
Na mesma shell, conecte ao MongoDB utilizando o mongosh
, a shell MongoDB executando o seguinte comando:
mongosh "mongodb://mongo1"
Depois de se conectar com sucesso, você deverá ver o seguinte prompt do shell do MongoDB:
rs0 [direct: primary] test>
No prompt, digite os seguintes comandos para inserir um novo documento:
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
Quando o MongoDB concluir o comando de inserção, você deverá receber uma confirmação que se assemelha ao texto a seguir:
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
Saia da shell MongoDB inserindo o comando exit
.
Verifique o status do seu ambiente Kafka usando o seguinte comando:
status
Na saída do comando anterior, você deve ver o novo tópico que o conector de origem criou após receber o evento change:
... "topic": "Tutorial1.orders", ...
Confirme o conteúdo dos dados sobre o novo tópico do Kafka executando o seguinte comando:
kc Tutorial1.orders
Observação
O comando kc
é um script auxiliar que gera o conteúdo de um tópico Kafka.
Você deve ver os seguintes dados de tópico do Kafka, organizados pelas seções "Chave" e "Valor" ao executar o comando anterior:
Na seção "Valor" da saída, você pode encontrar a parte do payload
que inclui os dados do fullDocument
como destacado no seguinte documento JSON formatado:
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
Reconfigurar o change stream
Você pode omitir os metadados dos eventos criados pelo fluxo de alterações, configurando-o para retornar apenas o campo fullDocument
.
Pare o conector usando o seguinte comando:
del mongo-simple-source
Observação
O comando del
é um script auxiliar que chama a API REST do Kafka Connect para parar o conector e é equivalente ao seguinte comando:
curl -X DELETE connect:8083/connectors/<parameter>
Edite o arquivo de configuração de origem denominado simplesource.json
com o seguinte comando:
nano simplesource.json
Remova a configuração existente, adicione a seguinte configuração e salve o arquivo:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
Execute o seguinte comando na shell para iniciar o conector de origem usando o arquivo de configuração que você atualizou:
cx simplesource.json
Conecte ao MongoDB utilizando o mongosh
utilizando o seguinte comando:
mongosh "mongodb://mongo1"
No prompt, digite os seguintes comandos para inserir um novo documento:
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
Sair do mongosh
executando o seguinte comando:
exit
Confirme o conteúdo dos dados sobre o novo tópico do Kafka executando o seguinte comando:
kc Tutorial1.orders
O campo payload
no documento "Valor" deve conter apenas os seguintes dados do documento:
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(Opcional) Interromper os containers do Docker
Depois de concluir este tutorial, libere recursos em seu computador interrompendo ou removendo os ativos do Docker. Você pode remover os containers e as imagens do Docker, ou somente os containers. Se você remover os containers e as imagens, será necessário baixá-los novamente para reiniciar seu ambiente de desenvolvimento do Conector Kafka MongoDB, que tem aproximadamente 2,4 GB. Se você remover somente os containers, poderá reutilizar as imagens sem precisar baixar a maioria dos arquivos grandes na pipeline de dados de amostra.
Dica
Mais tutoriais
Se você pretende concluir mais tutoriais do Conector Kafka MongoDB, considere remover apenas os containers. Se você não pretende concluir mais tutoriais do Conector Kafka MongoDB, considere remover containers e imagens.
Selecione a aba que corresponde à tarefa de remoção que deseja executar.
Execute o seguinte comando shell para remover os containers e as imagens do Docker para o ambiente de desenvolvimento:
docker-compose -p mongo-kafka down --rmi all
Execute o seguinte comando shell para remover os containers do Docker, mas mantenha as imagens para o ambiente de desenvolvimento:
docker-compose -p mongo-kafka down
Para reiniciar os containers, siga as mesmas etapas necessárias para iniciá-los na configuração do tutorial.
Resumo
Neste tutorial, você iniciou um conector de origem usando configurações diferentes para alterar os dados de eventos de fluxo de alterações publicados em um tópico do Kafka.
Saiba mais
Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial: