Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Saiba por que o MongoDB foi selecionado como um líder no 2024 Gartner_Magic Quadrupnt()
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
MongoDBchevron-right

Tutorial de ponta a ponta do Kafka para MongoDB Atlas

Pavel Duchovny6 min read • Published Jan 25, 2022 • Updated Jun 07, 2023
KafkaMongoDBJava
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Os aplicativos orientados a dados e eventos estão em alta demanda em uma grande variedade de setores. Com essa demanda, há um desafio crescente de como sincronizar os dados em diferentes fontes de dados.
Uma solução amplamente adotada para comunicar a transferência de dados em tempo real em vários componentes em sistemas da organização é implementada por meio de filas clusterizadas. Uma das soluções populares e testadas é o Apache Kafka.
O cluster do Kafka foi projetado para fluxos de dados que gravam eventos sequencialmente em logs de confirmação, permitindo a movimentação de dados em tempo real entre seus serviços. Os dados são agrupados em tópicos dentro de um cluster Kafka.
O MongoDB fornece um Kafka Connector certificado pela Confluent, um dos maiores fornecedores de Kafka. Com o Kafka Connector e o software Confluent, você pode publicar dados de um cluster MongoDB em tópicos Kafka usando um connector de origem. Além disso, com um connector de pia, você pode consumir dados de um tópico do Kafka para persistir direta e consistentemente em uma collection MongoDB dentro do cluster MongoDB.
Neste artigo, forneceremos um guia passo a passo simples sobre como conectar um cluster Kafka remoto - neste caso, um serviço da Confluent Cloud - a um clusterMongoDB Atlas . Para fins de simplicidade, a instalação é mínima e projetada para um ambiente de desenvolvimento pequeno. No entanto, por meio do artigo, forneceremos orientações e links para considerações relacionadas à produção.
Pré-requisitos: para evitar problemas de certificadoconhecidosdo JDK, atualize seu JDK para uma das seguintes versões de patch ou mais recente:
  • JDK 11.0.7+
  • JDK 13.0.3+
  • JDK 14.0.2+

Sumário

Criar um cluster básico de cloud confluente

Começaremos criando um cluster Kafka básico no Confluent Cloud.
Quando estiver pronto, crie um tópico para ser usado no cluster Kafka. Eu criei um chamado "orders."
Este tópico "orders " será usado pelo connector do Kafka Sink. Quaisquer dados neste tópico serão mantidos automaticamente no banco de dados do Atlas.
Tópicos do cluster Kafka
Você também precisará de outro tópico chamado "outsource.kafka.receipts". Este tópico será usado pelo conector de origem do MongoDB, recebendo streaming do banco de dados Atlas.
Gere um api-key e api-secret para interagir com este cluster Kafka. Para simplificar este tutorial, selecionei a api-key "Global Access ". Para produção, recomenda-se fornecer o mínimo de permissões possível para a chave de API usada. Obtenha as chaves geradas para uso futuro.
Obtenha a string de conexão do cluster Kafka via Cluster Overview > Cluster Settings > Identification > Bootstrap server para uso futuro. Os clusters básicos são abertos à Internet e, na produção, você precisará alterar a lista de acesso para que seus hosts específicos se conectem ao cluster por meio de ACLs de cluster avançadas.

Crie um projeto e um cluster no MongoDB Atlas

Crie um projeto e cluster ou utilize um Atlas cluster existente em seu projeto.
M0 Cluster Atlas
Prepare seu Atlas cluster para uma conexão kafka-connect . Dentro da lista de acesso do seu projeto, habilite o usuário e endereços IP relevantes do seu host local, aquele usado para binários do Kafka Connect. Por fim, obtenha a connection string do Atlas para uso futuro.

Instalar um Kafka Connect Worker

O Kafka Connect é um dos mecanismos para transmitir dados de forma confiável entre diferentes sistemas de dados e um cluster Kafka. Para uso em produção, recomendamos o uso de uma implantação distribuída para alta disponibilidade, tolerância a falhas e escalabilidade. Há também uma versão da nuvem para instalar o connector no Confluent Cloud.
Para este tutorial simples, usaremos uma instalaçãolocal autônoma do Kafka Connect.
Para ter os binários para instalar o kafka-connect e todas as suas dependências, vamos baixar os arquivos:
1curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
2tar -xvf confluent-community-7.0.1.tar.gz

Configurar o Kafka Connect

Configure o diretório de plug-ins onde hospedaremos o -in MongoDB Kafka Connector plugin:
1mkdir -p /usr/local/share/kafka/plugins
Edite o <confluent-install-dir>/etc/schema-registry/connect-avro-standalone.properties usando o conteúdo fornecido abaixo. Certifique-se de substituir o <kafka-cluster>:<kafka-port> por informações retiradas do servidor de inicialização do Confluent Cloud anterior.
Além disso, substitua os <api-key> e <api-secret>gerados a partir do Confluent Cloud em cada seção.
1bootstrap.servers=<kafka-cluster>:<kafka-port>
2
3Connect data. Every Connect user will
4# need to configure these based on the format they want their data in when loaded from or stored into Kafka
5key.converter=org.apache.kafka.connect.json.JsonConverter
6value.converter=org.apache.kafka.connect.json.JsonConverter
7# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
8# it to
9key.converter.schemas.enable=false
10value.converter.schemas.enable=false
11
12# The internal converter used for offsets and config data is configurable and must be specified, but most users will
13# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
14internal.key.converter=org.apache.kafka.connect.json.JsonConverter
15internal.value.converter=org.apache.kafka.connect.json.JsonConverter
16internal.key.converter.schemas.enable=false
17internal.value.converter.schemas.enable=false
18
19# Store offsets on local filesystem
20offset.storage.file.filename=/tmp/connect.offsets
21# Flush much faster than normal, which is useful for testing/debugging
22offset.flush.interval.ms=10000
23
24ssl.endpoint.identification.algorithm=https
25
26
27sasl.mechanism=PLAIN
28request.timeout.ms=20000
29retry.backoff.ms=500
30sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
31username="<api-key>" password="<api-secret>";
32security.protocol=SASL_SSL
33
34consumer.ssl.endpoint.identification.algorithm=https
35consumer.sasl.mechanism=PLAIN
36consumer.request.timeout.ms=20000
37consumer.retry.backoff.ms=500
38consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
39username="<api-key>" password="<api-secret>";
40consumer.security.protocol=SASL_SSL
41
42producer.ssl.endpoint.identification.algorithm=https
43producer.sasl.mechanism=PLAIN
44producer.request.timeout.ms=20000
45producer.retry.backoff.ms=500
46producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
47username="<api-key>" password="<api-secret>";
48producer.security.protocol=SASL_SSL
49
50plugin.path=/usr/local/share/kafka/plugins
Importante: coloque o plugin.path para apontar para o diretório do nosso plugin com permissões para o usuário que está executando o processo kafka-connect.

Instale o JAR do conector do MongoDB:

Faça o download do recipiente "all" e coloque-o no diretório de plugin -ins.
1cp ~/Downloads/mongo-kafka-connect-1.6.1-all.jar /usr/local/share/kafka/plugins/

Configurar um conector do coletor MongoDB

O connector MongoDB Sink nos permitirá ler dados de um tópico específico do Kafka e escrever em uma collection MongoDB dentro de nosso cluster. Crie um arquivo de propriedades do connector de coletor do MongoDB no diretório de trabalho principal: mongo-sink.properties com os detalhes do cluster do Atlas substituindo <username>:<password>@<atlas-cluster>/<database> na guia Conexão do Atlas. O diretório de trabalho pode ser qualquer diretório ao qual o binárioconnect-standalone tenha acesso e seu caminho pode ser fornecido para o comando kafka-connectmostrado na seção"Iniciar o Kafka Connect e conectores".
1name=mongo-sink
2topics=orders
3connector.class=com.mongodb.kafka.connect.MongoSinkConnector
4tasks.max=1
5connection.uri=mongodb+srv://<username>:<password>@<atlas-cluster>/<database>?retryWrites=true&w=majority
6database=kafka
7collection=orders
8max.num.retries=1
9retries.defer.timeout=5000
Com a configuração acima, ouviremos o tópico chamado "orders " e publicaremos os documentos de entrada no banco de dadoskafka e no nome da coleção orders.

Configurar o conector de origem do Mongo

O MongoDB Source connector nos permitirá ler dados de um tópico específico da coleção do MongoDB e escrever em um tópico do Kafka. Quando os dados chegarem a uma coleção chamada receipts, podemos usar um connector para transferi-los para um tópico predefinido do Kafka chamado "outsource.kafka.receipts " (o prefixo configurado seguido pelo nome<database>.<collection> como tópico — é possível use mapeamento avançado para mudar isso).
Vamos criar o arquivo mongo-source.properties no diretório de trabalho principal:
1name=mongo-source
2connector.class=com.mongodb.kafka.connect.MongoSourceConnector
3tasks.max=1
4
5# Connection and source configuration
6connection.uri=mongodb+srv://<username>:<password>@<atlas-cluster>/<database>?retryWrites=true&w=majority
7database=kafka
8collection=receipts
9
10topic.prefix=outsource
11topic.suffix=
12poll.max.batch.size=1000
13poll.await.time.ms=5000
14
15# Change stream options
16pipeline=[]
17batch.size=0
18change.stream.full.document=updateLookup
19publish.full.document.only=true
20collation=
As principais propriedades aqui são o banco de dados, a collection e o pipeline de agregação usados para escutar as alterações de entrada, bem como a connection string. O topic.prefix adiciona um prefixo ao namespace<database>.<collection> como o tópico Kafka no lado Confluent. Nesse caso, o nome do tópico que receberá novos registros MongoDB é "outsource.kafka.receipts " e foi predefinido anteriormente neste tutorial.
Também adicionei publish.full.document.only=true, pois só preciso do documento real alterado ou inserido sem as informações de encapsulamento do evento change stream.

Inicie o Kafka Connect e os conectores

Por motivos de simplicidade, estou executando o Kafka Connect standalone em primeiro plano.
1 ./confluent-7.0.1/bin/connect-standalone ./confluent-7.0.1/etc/schema-registry/connect-avro-standalone.properties mongo-sink.properties mongo-source.properties
Importante: execute com a versão mais recente do Java para evitar bugs SSL do JDK.
Agora, cada documento que será preenchido no tópico "orders " será inserido na collectionorders usando um connector de pia. Um connector de origem que configuramos transmitirá cada documento de recebimento da collectionreceipt de volta para outro tópico chamado "outsource.kafka.receipts" para mostrar um consumo do MongoDB a um tópico do Kafka.

Publicar documentos na fila do Kafka

Por meio da interface do usuário do Confluent, enviei um documento de teste para o meu tópico “orders”. Produzir dados no tópico "ordens"

O Atlas Cluster está sendo automaticamente preenchido com os dados

Analisando meu Atlas cluster, posso ver uma nova collection chamada orders no banco de dadoskafka.
Collection de pedidos
Agora, vamos presumir que nosso aplicativo recebeu o documento de pedido da collectionorders e gerou um recebimento. Podemos replicar isso inserindo um documento na coleçãokafka.reciepts:
Collection de receitas
Esta operação fará com que o connector de origem produza uma mensagem no tópico "outsource.kafka.reciepts".

Kafka "outsource.kafka.reciepts" topic

Dados recebidos em "outsource.kafka.reciepts" topic
As linhas de registro no kafka-connect mostrarão que o processo recebeu e publicou o documento:
1[2021-12-14 15:31:18,376] INFO [mongo-source|task-0] [Producer clientId=connector-producer-mongo-source-0] Cluster ID: lkc-65rmj (org.apache.kafka.clients.Metadata:287)
2[2021-12-14 15:31:18,675] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:21, serverValue:99712}] to dev-shard-00-02.uvwhr.mongodb.net:27017 (org.mongodb.driver.connection:71)
3[2021-12-14 15:31:18,773] INFO [mongo-source|task-0] Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203)
4[2021-12-14 15:31:18,773] INFO [mongo-source|task-0] WorkerSourceTask{id=mongo-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
5[2021-12-14 15:31:27,671] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505
6[2021-12-14 15:31:37,673] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)

Resumo

Neste artigo de instruções, abordei os fundamentos da criação de uma integração simples, mas poderosa, do MongoDB Atlas com clusters Kafka usando o MongoDB Kafka Connector e o Kafka Connect.
Esse deve ser um bom ponto de partida para você começar com sua próxima pilha de aplicativos orientada a eventos e uma integração bem-sucedida entre o MongoDB e o Kafka.
Experimente o MongoDB Atlas e oKafka Connector hoje mesmo!

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Início rápido

Início rápido: tipos de dados BSON - ObjectId


Sep 23, 2022 | 3 min read
Tutorial

Adição de notificações em tempo real ao Ghost CMS usando MongoDB e eventos enviados pelo servidor


Aug 14, 2023 | 7 min read
Artigo

Gerando comandos de shell MQL usando OpenAI e o novo shell mongosh


Jul 11, 2023 | 7 min read
Artigo

Paginações 1.0: Coleções de séries temporais em cinco minutos


May 19, 2022 | 4 min read
Sumário
  • Sumário
  • Criar um cluster básico de cloud confluente
  • Crie um projeto e um cluster no MongoDB Atlas
  • Instalar um Kafka Connect Worker
  • Configurar o Kafka Connect
  • Publicar documentos na fila do Kafka
  • Resumo