Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector

Início rápido do conector Kafka

Nesta página

  • Visão geral
  • Instale os pacotes necessários
  • Baixe o Sandbox
  • Iniciar o Sandbox
  • Adicionar conectores
  • Adicionar um conector de origem
  • Adicionar um conector de coletor
  • Envie o conteúdo de um documento por meio de conectores
  • Remover a Sandbox
  • Próximos passos

Este guia mostra como configurar o Conector Kafka MongoDB para enviar dados entre MongoDB e Apache Kafka.

Após concluir este guia, você deve entender como usar a API REST do Kafka Connect para configurar os Kafka Connectors MongoDB para ler dados do MongoDB e escrevê-los em um tópico do Kafka, e para ler dados de um tópico do Kafka e escrevê-los no MongoDB.

Para concluir as etapas deste guia, você deve baixar e trabalhar em um sandbox, um ambiente de desenvolvimento em contêiner que inclui serviços necessários para criar uma amostra de pipeline de dados.

Leia as seções abaixo para configurar a sandbox e o pipeline de dados de amostra.

Observação

Depois de concluir o guia, você pode remover o ambiente seguindo as instruções na seção Remover o Sandbox.

Baixe e instale os seguintes pacotes:

Dica

Leia a documentação do Docker

Este guia usa a seguinte terminologia específica do Docker:

Saiba mais sobre o Docker no Guia de Introdução oficial do Docker.

A sandbox usa o Docker para oferecer maior conveniência e consistência. Para saber mais sobre as opções de implantação do Apache Kafka, consulte os seguintes recursos:

Criamos uma sandbox que inclui os serviços necessários neste tutorial para criar uma pipeline de dados de amostra.

Para baixar a sandbox, clone o repositório de tutoriais para seu ambiente de desenvolvimento. Em seguida, navegue até o diretório que corresponde ao tutorial de início rápido. Se você usar bash ou shell semelhante, use os seguintes comandos:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

A sandbox inicia os seguintes serviços nos containers do Docker:

  • MongoDB, configurado como um conjunto de réplicas

  • Apache Kafka

  • Kafka Connect com o conector Kafka MongoDB instalado

  • Apache Zookeeper que gerencia a configuração do Apache Kafka

Para iniciar o sandbox, execute o seguinte comando no diretório do tutorial:

docker-compose -p mongo-kafka up -d --force-recreate

Ao iniciar a sandbox, o Docker baixa todas as imagens necessárias para a execução.

Observação

Quanto tempo leva para baixar?

No total, as imagens do Docker para este tutorial exigem cerca de 2,4 GB de espaço. A lista abaixo mostra quanto tempo leva para baixar as imagens com diferentes velocidades de internet:

  • 40 megabits por segundo: 8 minutos

  • 20 megabits por segundo: 16 minutos

  • 10 megabits por segundo: 32 minutos

Após o Docker baixar e construir as imagens, você verá a seguinte saída em seu ambiente de desenvolvimento:

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

Observação

Mapeamentos de portas

O sandbox mapeia os seguintes serviços para portas na sua máquina host:

  • O servidor de sandbox MongoDB mapeia a porta 35001 em sua máquina host

  • O servidor sandbox Kafka Connect JMX mapeia para a porta 35000 em sua máquina host

Essas portas devem estar livres para iniciar o sandbox.

Para concluir o pipeline de dados de amostra, é preciso adicionar conectores ao Kafka Connect para transferir dados entre o Kafka Connect e o MongoDB. Adicione um conector de origem para transferir dados do MongoDB para o Apache Kafka. Adicione um conector de coletor para transferir dados do Apache Kafka para o MongoDB.

Para adicionar conectores ao sandbox, primeiro inicie um shell bash interativo no seu container do Docker usando o seguinte comando:

docker exec -it mongo1 /bin/bash

Após o início da sessão da shell, você verá a seguinte solicitação:

MongoDB Kafka Connector Sandbox $

Use a shell em seu container do Docker para adicionar um conector de origem usando a API REST do Kafka Connect.

A seguinte solicitação de API adiciona um conector de origem configurado com as seguintes propriedades:

  • A classe Kafka Connect usa para instanciar o conector

  • A URI de conexão, banco de dados e collection do conjunto de réplicas do MongoDB do qual o conector lê dados

  • Uma aggregation pipeline que adiciona um campo travel com o valor "MongoDB Kafka Connector" aos documentos inseridos que o conector lê do MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

Observação

Por que vejo a mensagem "Falha ao conectar"?

O início da API REST do Kafka Connect leva até três minutos. Se você receber o seguinte erro, aguarde três minutos e execute o comando anterior novamente:

...
curl: (7) Failed to connect to connect port 8083: Connection refused

Para confirmar que adicionou o conector de origem, execute o seguinte comando:

curl -X GET http://connect:8083/connectors

O comando anterior deve produzir os nomes dos conectores em execução:

["mongo-source"]

Para saber mais sobre as propriedades do conector de origem, consulte a página sobre Propriedades de configuração do conector de origem.

Para saber mais sobre aggregation pipelines, consulte a página do Manual do MongoDB em Aggregation pipelines.

Use a shell em seu container do Docker para adicionar um conector de coletor usando a API REST do Kafka Connect.

A seguinte solicitação de API adiciona um conector de coletor configurado com as seguintes propriedades:

  • A classe Kafka Connect usa para instanciar o conector

  • A URI de conexão, banco de dados e collection do conjunto de réplicas do MongoDB para o qual o conector grava dados

  • O tópico Apache Kafka do qual o conector lê dados

  • Um manipulador de captura de dados de alteração para documentos de alteração de eventos do MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

Para confirmar que você adicionou o conector de origem e de coletor, execute o seguinte comando:

curl -X GET http://connect:8083/connectors

O comando anterior deve produzir os nomes dos conectores em execução:

["mongo-source", "mongo-sink"]

Para saber mais sobre as propriedades do conector de coletor, consulte a página sobre Propriedades de configuração do conector de coletores.

Para saber mais sobre como alterar eventos de captura de dados, consulte o guia Manipulador de captura de dados de alteração.

Para enviar o conteúdo de um documento pelos conectores, insira um documento na collection do MongoDB do qual o conector de origem lerá os dados.

Para inserir um novo documento na sua collection, insira o shell do MongoDB do shell no container do Docker usando o seguinte comando:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

Depois de executar o comando anterior, você deverá ver o seguinte prompt:

rs0 [primary] test>

No MongoDB shell, insira um documento na collection sampleData do banco de dados de quickstart usando os seguintes comandos:

use quickstart
db.sampleData.insertOne({"hello":"world"})

Depois de inserir um documento na collection sampleData, confira se seus conectores processaram a alteração. Verifique o conteúdo da collection topicData usando o seguinte comando:

db.topicData.find()

Você deverá ver uma saída semelhante à seguinte:

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

Saia do MongoDB shell com o seguinte comando:

exit

Para conservar recursos em seu ambiente de desenvolvimento, remova a sandbox.

Antes de remover a sandbox, saia da sessão de shell em seu container Docker executando o seguinte comando:

exit

Você pode remover os containers e as imagens do Docker, ou somente os containers. Se você remover os containers e as imagens, deverá baixá-los novamente para reiniciar sua sandbox, que tem aproximadamente 2,4 GB de tamanho. Se você remover somente os containers, poderá reutilizar as imagens sem precisar baixar a maioria dos arquivos grandes na pipeline de dados de amostra.

Selecione a aba que corresponde à tarefa de remoção que deseja executar.

Execute o seguinte comando de shell para remover os contêineres e imagens do Docker da sandbox:

docker-compose -p mongo-kafka down --rmi all

Execute o seguinte comando de shell para remover os contêineres do Docker, mas mantenha as imagens para a sandbox:

docker-compose -p mongo-kafka down

Para instalar o Conector Kafka do MongoDB, veja o guia Como instalar o Conector Kafka do MongoDB.

Para saber mais sobre como processar e mover dados do Apache Kafka para o MongoDB, veja o guia Connector de coletores.

Para saber mais sobre como processar e mover dados do MongoDB para o Apache Kafka, veja o guia Conector de origem.

Voltar

NOVIDADES