Início rápido Kafka Connector
Nesta página
Visão geral
Este guia mostra como configurar o Conector Kafka MongoDB para enviar dados entre MongoDB e Apache Kafka.
Após concluir este guia, você deve entender como usar a API REST do Kafka Connect para configurar os Kafka Connectors MongoDB para ler dados do MongoDB e escrevê-los em um tópico do Kafka, e para ler dados de um tópico do Kafka e escrevê-los no MongoDB.
Para concluir as etapas deste guia, você deve baixar e trabalhar em um sandbox, um ambiente de desenvolvimento em contêiner que inclui serviços necessários para criar uma amostra de pipeline de dados.
Leia as seções abaixo para configurar a sandbox e o pipeline de dados de amostra.
Observação
Depois de concluir o guia, você pode remover o ambiente seguindo as instruções na seção Remover o Sandbox.
Instale os pacotes necessários
Baixe e instale os seguintes pacotes:
Dica
Leia a documentação do Docker
Este guia usa a seguinte terminologia específica do Docker:
Saiba mais sobre o Docker no Guia de Introdução oficial do Docker.
A sandbox usa o Docker para oferecer maior conveniência e consistência. Para saber mais sobre as opções de implantação do Apache Kafka, consulte os seguintes recursos:
Baixe o Sandbox
Criamos uma sandbox que inclui os serviços necessários neste tutorial para criar uma pipeline de dados de amostra.
Para baixar a sandbox, clone o repositório de tutoriais para seu ambiente de desenvolvimento. Em seguida, navegue até o diretório que corresponde ao tutorial de início rápido. Se você usar bash ou shell semelhante, use os seguintes comandos:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/
Iniciar o Sandbox
A sandbox inicia os seguintes serviços nos containers do Docker:
MongoDB, configurado como um conjunto de réplicas
Apache Kafka
Kafka Connect com o conector Kafka MongoDB instalado
Apache Zookeeper que gerencia a configuração do Apache Kafka
Para iniciar o sandbox, execute o seguinte comando no diretório do tutorial:
docker compose -p mongo-kafka up -d --force-recreate
Ao iniciar a sandbox, o Docker baixa todas as imagens necessárias para a execução.
Observação
Quanto tempo leva para baixar?
No total, as imagens do Docker para este tutorial exigem cerca de 2,4 GB de espaço. A lista abaixo mostra quanto tempo leva para baixar as imagens com diferentes velocidades de internet:
40 megabits por segundo: 8 minutos
20 megabits por segundo: 16 minutos
10 megabits por segundo: 32 minutos
Após o Docker baixar e construir as imagens, você verá a seguinte saída em seu ambiente de desenvolvimento:
... Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating connect ... done Creating rest-proxy ... done Creating mongo1 ... done Creating mongo1-setup ... done
Observação
Mapeamentos de portas
O sandbox mapeia os seguintes serviços para portas na sua máquina host:
O servidor de sandbox MongoDB mapeia a porta
35001
em sua máquina hostO servidor sandbox Kafka Connect JMX mapeia para a porta
35000
em sua máquina host
Essas portas devem estar livres para iniciar o sandbox.
Adicionar conectores
Para concluir o pipeline de dados de amostra, é preciso adicionar conectores ao Kafka Connect para transferir dados entre o Kafka Connect e o MongoDB. Adicione um conector de origem para transferir dados do MongoDB para o Apache Kafka. Adicione um conector de coletor para transferir dados do Apache Kafka para o MongoDB.
Para adicionar conectores ao sandbox, primeiro inicie um shell bash interativo no seu container do Docker usando o seguinte comando:
docker exec -it mongo1 /bin/bash
Após o início da sessão da shell, você verá a seguinte solicitação:
MongoDB Kafka Connector Sandbox $
Adicionar um conector de origem
Use a shell em seu container do Docker para adicionar um conector de origem usando a API REST do Kafka Connect.
A seguinte solicitação de API adiciona um conector de origem configurado com as seguintes propriedades:
A classe Kafka Connect usa para instanciar o conector
A URI de conexão, banco de dados e collection do conjunto de réplicas do MongoDB do qual o conector lê dados
Uma aggregation pipeline que adiciona um campo
travel
com o valor"MongoDB Kafka Connector"
aos documentos inseridos que o conector lê do MongoDB
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://connect:8083/connectors -w "\n"
Observação
Por que vejo a mensagem "Falha ao conectar"?
O início da API REST do Kafka Connect leva até três minutos. Se você receber o seguinte erro, aguarde três minutos e execute o comando anterior novamente:
... curl: (7) Failed to connect to connect port 8083: Connection refused
Para confirmar que adicionou o conector de origem, execute o seguinte comando:
curl -X GET http://connect:8083/connectors
O comando anterior deve produzir os nomes dos conectores em execução:
["mongo-source"]
Para saber mais sobre as propriedades do conector de origem, consulte a página sobre Propriedades de configuração do Connector de origem.
Para saber mais sobre aggregation pipelines, consulte a página do Manual do MongoDB em Aggregation pipelines.
Adicionar um conector de coletor
Use a shell em seu container do Docker para adicionar um conector de coletor usando a API REST do Kafka Connect.
A seguinte solicitação de API adiciona um conector de coletor configurado com as seguintes propriedades:
A classe Kafka Connect usa para instanciar o conector
A URI de conexão, banco de dados e collection do conjunto de réplicas do MongoDB para o qual o conector grava dados
O tópico Apache Kafka do qual o conector lê dados
Um manipulador de captura de dados de alteração para documentos de alteração de eventos do MongoDB
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://connect:8083/connectors -w "\n"
Para confirmar que você adicionou o conector de origem e de coletor, execute o seguinte comando:
curl -X GET http://connect:8083/connectors
O comando anterior deve produzir os nomes dos conectores em execução:
["mongo-source", "mongo-sink"]
Para saber mais sobre as propriedades do conector de coletor, consulte a página sobre Propriedades de configuração do conector de coletores.
Para saber mais sobre como alterar eventos de captura de dados, consulte o guia Manipulador de captura de dados de alteração.
Envie o conteúdo de um documento por meio de conectores
Para enviar o conteúdo de um documento pelos conectores, insira um documento na collection do MongoDB do qual o conector de origem lerá os dados.
Para inserir um novo documento na sua collection, insira o shell do MongoDB do shell no container do Docker usando o seguinte comando:
mongosh mongodb://mongo1:27017/?replicaSet=rs0
Depois de executar o comando anterior, você deverá ver o seguinte prompt:
rs0 [primary] test>
No MongoDB shell, insira um documento na collection sampleData
do banco de dados de quickstart
usando os seguintes comandos:
use quickstart db.sampleData.insertOne({"hello":"world"})
Depois de inserir um documento na collection sampleData
, confira se seus conectores processaram a alteração. Verifique o conteúdo da collection topicData
usando o seguinte comando:
db.topicData.find()
Você deverá ver uma saída semelhante à seguinte:
[ { _id: ObjectId(...), hello: 'world', travel: 'MongoDB Kafka Connector' } ]
Saia do MongoDB shell com o seguinte comando:
exit
Remover a Sandbox
Para conservar recursos em seu ambiente de desenvolvimento, remova a sandbox.
Antes de remover a sandbox, saia da sessão de shell em seu container Docker executando o seguinte comando:
exit
Você pode remover os containers e as imagens do Docker, ou somente os containers. Se você remover os containers e as imagens, deverá baixá-los novamente para reiniciar sua sandbox, que tem aproximadamente 2,4 GB de tamanho. Se você remover somente os containers, poderá reutilizar as imagens sem precisar baixar a maioria dos arquivos grandes na pipeline de dados de amostra.
Selecione a aba que corresponde à tarefa de remoção que deseja executar.
Execute o seguinte comando de shell para remover os contêineres e imagens do Docker da sandbox:
docker-compose -p mongo-kafka down --rmi all
Execute o seguinte comando de shell para remover os contêineres do Docker, mas mantenha as imagens para a sandbox:
docker-compose -p mongo-kafka down
Próximos passos
Para instalar o Conector Kafka do MongoDB, veja o guia Como instalar o Conector Kafka do MongoDB.
Para saber mais sobre como processar e mover dados do Apache Kafka para o MongoDB, veja o guia Connector de coletores.
Para saber mais sobre como processar e mover dados do MongoDB para o Apache Kafka, veja o guia Conector de origem.