Tutorial de ponta a ponta do Kafka para MongoDB Atlas
Avalie esse Tutorial
Os aplicativos orientados a dados e eventos estão em alta demanda em uma grande variedade de setores. Com essa demanda, há um desafio crescente de como sincronizar os dados em diferentes fontes de dados.
Uma solução amplamente adotada para comunicar a transferência de dados em tempo real em vários componentes em sistemas da organização é implementada por meio de filas clusterizadas. Uma das soluções populares e testadas é o Apache Kafka.
O cluster do Kafka foi projetado para fluxos de dados que gravam eventos sequencialmente em logs de confirmação, permitindo a movimentação de dados em tempo real entre seus serviços. Os dados são agrupados em tópicos dentro de um cluster Kafka.
O MongoDB fornece um Kafka Connector certificado pela Confluent, um dos maiores fornecedores de Kafka. Com o Kafka Connector e o software Confluent, você pode publicar dados de um cluster MongoDB em tópicos Kafka usando um connector de origem. Além disso, com um connector de pia, você pode consumir dados de um tópico do Kafka para persistir direta e consistentemente em uma collection MongoDB dentro do cluster MongoDB.
Neste artigo, forneceremos um guia passo a passo simples sobre como conectar um cluster Kafka remoto - neste caso, um serviço da Confluent Cloud - a um clusterMongoDB Atlas . Para fins de simplicidade, a instalação é mínima e projetada para um ambiente de desenvolvimento pequeno. No entanto, por meio do artigo, forneceremos orientações e links para considerações relacionadas à produção.
Pré-requisitos: para evitar problemas de certificadoconhecidosdo JDK, atualize seu JDK para uma das seguintes versões de patch ou mais recente:
- JDK 11.0.7+
- JDK 13.0.3+
- JDK 14.0.2+
Quando estiver pronto, crie um tópico para ser usado no cluster Kafka. Eu criei um chamado "orders."
Este tópico "orders " será usado pelo connector do Kafka Sink. Quaisquer dados neste tópico serão mantidos automaticamente no banco de dados do Atlas.
Você também precisará de outro tópico chamado "outsource.kafka.receipts". Este tópico será usado pelo conector de origem do MongoDB, recebendo streaming do banco de dados Atlas.
Gere um
api-key
e api-secret
para interagir com este cluster Kafka. Para simplificar este tutorial, selecionei a api-key "Global Access ". Para produção, recomenda-se fornecer o mínimo de permissões possível para a chave de API usada. Obtenha as chaves geradas para uso futuro.Obtenha a string de conexão do cluster Kafka via
Cluster Overview > Cluster Settings > Identification > Bootstrap server
para uso futuro. Os clusters básicos são abertos à Internet e, na produção, você precisará alterar a lista de acesso para que seus hosts específicos se conectem ao cluster por meio de ACLs de cluster avançadas.Crie um projeto e cluster ou utilize um Atlas cluster existente em seu projeto.
Prepare seu Atlas cluster para uma conexão kafka-connect . Dentro da lista de acesso do seu projeto, habilite o usuário e endereços IP relevantes do seu host local, aquele usado para binários do Kafka Connect. Por fim, obtenha a connection string do Atlas para uso futuro.
O Kafka Connect é um dos mecanismos para transmitir dados de forma confiável entre diferentes sistemas de dados e um cluster Kafka. Para uso em produção, recomendamos o uso de uma implantação distribuída para alta disponibilidade, tolerância a falhas e escalabilidade. Há também uma versão da nuvem para instalar o connector no Confluent Cloud.
Para ter os binários para instalar o kafka-connect e todas as suas dependências, vamos baixar os arquivos:
1 curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz 2 tar -xvf confluent-community-7.0.1.tar.gz
Configure o diretório de plug-ins onde hospedaremos o -in MongoDB Kafka Connector plugin:
1 mkdir -p /usr/local/share/kafka/plugins
Edite o
<confluent-install-dir>/etc/schema-registry/connect-avro-standalone.properties
usando o conteúdo fornecido abaixo. Certifique-se de substituir o <kafka-cluster>:<kafka-port>
por informações retiradas do servidor de inicialização do Confluent Cloud anterior.Além disso, substitua os
<api-key>
e <api-secret>
gerados a partir do Confluent Cloud em cada seção.1 bootstrap.servers=<kafka-cluster>:<kafka-port> 2 3 Connect data. Every Connect user will 4 # need to configure these based on the format they want their data in when loaded from or stored into Kafka 5 key.converter=org.apache.kafka.connect.json.JsonConverter 6 value.converter=org.apache.kafka.connect.json.JsonConverter 7 # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply 8 # it to 9 key.converter.schemas.enable=false 10 value.converter.schemas.enable=false 11 12 # The internal converter used for offsets and config data is configurable and must be specified, but most users will 13 # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. 14 internal.key.converter=org.apache.kafka.connect.json.JsonConverter 15 internal.value.converter=org.apache.kafka.connect.json.JsonConverter 16 internal.key.converter.schemas.enable=false 17 internal.value.converter.schemas.enable=false 18 19 # Store offsets on local filesystem 20 offset.storage.file.filename=/tmp/connect.offsets 21 # Flush much faster than normal, which is useful for testing/debugging 22 offset.flush.interval.ms=10000 23 24 ssl.endpoint.identification.algorithm=https 25 26 27 sasl.mechanism=PLAIN 28 request.timeout.ms=20000 29 retry.backoff.ms=500 30 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 31 username="<api-key>" password="<api-secret>"; 32 security.protocol=SASL_SSL 33 34 consumer.ssl.endpoint.identification.algorithm=https 35 consumer.sasl.mechanism=PLAIN 36 consumer.request.timeout.ms=20000 37 consumer.retry.backoff.ms=500 38 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 39 username="<api-key>" password="<api-secret>"; 40 consumer.security.protocol=SASL_SSL 41 42 producer.ssl.endpoint.identification.algorithm=https 43 producer.sasl.mechanism=PLAIN 44 producer.request.timeout.ms=20000 45 producer.retry.backoff.ms=500 46 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 47 username="<api-key>" password="<api-secret>"; 48 producer.security.protocol=SASL_SSL 49 50 plugin.path=/usr/local/share/kafka/plugins
Importante: coloque o
plugin.path
para apontar para o diretório do nosso plugin com permissões para o usuário que está executando o processo kafka-connect.1 cp ~/Downloads/mongo-kafka-connect-1.6.1-all.jar /usr/local/share/kafka/plugins/
O connector MongoDB Sink nos permitirá ler dados de um tópico específico do Kafka e escrever em uma collection MongoDB dentro de nosso cluster. Crie um arquivo de propriedades do connector de coletor do MongoDB no diretório de trabalho principal:
mongo-sink.properties
com os detalhes do cluster do Atlas substituindo <username>:<password>@<atlas-cluster>/<database>
na guia Conexão do Atlas. O diretório de trabalho pode ser qualquer diretório ao qual o binárioconnect-standalone
tenha acesso e seu caminho pode ser fornecido para o comando kafka-connect
mostrado na seção"Iniciar o Kafka Connect e conectores".1 name=mongo-sink 2 topics=orders 3 connector.class=com.mongodb.kafka.connect.MongoSinkConnector 4 tasks.max=1 5 connection.uri=mongodb+srv://<username>:<password>@<atlas-cluster>/<database>?retryWrites=true&w=majority 6 database=kafka 7 collection=orders 8 max.num.retries=1 9 retries.defer.timeout=5000
Com a configuração acima, ouviremos o tópico chamado "orders " e publicaremos os documentos de entrada no banco de dados
kafka
e no nome da coleção orders
.O MongoDB Source connector nos permitirá ler dados de um tópico específico da coleção do MongoDB e escrever em um tópico do Kafka. Quando os dados chegarem a uma coleção chamada
receipts
, podemos usar um connector para transferi-los para um tópico predefinido do Kafka chamado "outsource.kafka.receipts " (o prefixo configurado seguido pelo nome<database>.<collection>
como tópico — é possível use mapeamento avançado para mudar isso).Vamos criar o arquivo
mongo-source.properties
no diretório de trabalho principal:1 name=mongo-source 2 connector.class=com.mongodb.kafka.connect.MongoSourceConnector 3 tasks.max=1 4 5 # Connection and source configuration 6 connection.uri=mongodb+srv://<username>:<password>@<atlas-cluster>/<database>?retryWrites=true&w=majority 7 database=kafka 8 collection=receipts 9 10 topic.prefix=outsource 11 topic.suffix= 12 poll.max.batch.size=1000 13 poll.await.time.ms=5000 14 15 # Change stream options 16 pipeline=[] 17 batch.size=0 18 change.stream.full.document=updateLookup 19 publish.full.document.only=true 20 collation=
As principais propriedades aqui são o banco de dados, a collection e o pipeline de agregação usados para escutar as alterações de entrada, bem como a connection string. O
topic.prefix
adiciona um prefixo ao namespace<database>.<collection>
como o tópico Kafka no lado Confluent. Nesse caso, o nome do tópico que receberá novos registros MongoDB é "outsource.kafka.receipts " e foi predefinido anteriormente neste tutorial.Também adicionei
publish.full.document.only=true
, pois só preciso do documento real alterado ou inserido sem as informações de encapsulamento do evento change stream.Por motivos de simplicidade, estou executando o Kafka Connect standalone em primeiro plano.
1 ./confluent-7.0.1/bin/connect-standalone ./confluent-7.0.1/etc/schema-registry/connect-avro-standalone.properties mongo-sink.properties mongo-source.properties
Importante: execute com a versão mais recente do Java para evitar bugs SSL do JDK.
Agora, cada documento que será preenchido no tópico "orders " será inserido na collection
orders
usando um connector de pia. Um connector de origem que configuramos transmitirá cada documento de recebimento da collectionreceipt
de volta para outro tópico chamado "outsource.kafka.receipts" para mostrar um consumo do MongoDB a um tópico do Kafka.Por meio da interface do usuário do Confluent, enviei um documento de teste para o meu tópico “orders”.
Analisando meu Atlas cluster, posso ver uma nova collection chamada
orders
no banco de dadoskafka
.Agora, vamos presumir que nosso aplicativo recebeu o documento de pedido da collection
orders
e gerou um recebimento. Podemos replicar isso inserindo um documento na coleçãokafka.reciepts
:Esta operação fará com que o connector de origem produza uma mensagem no tópico "outsource.kafka.reciepts".
As linhas de registro no kafka-connect mostrarão que o processo recebeu e publicou o documento:
1 [2021-12-14 15:31:18,376] INFO [mongo-source|task-0] [Producer clientId=connector-producer-mongo-source-0] Cluster ID: lkc-65rmj (org.apache.kafka.clients.Metadata:287) 2 [2021-12-14 15:31:18,675] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:21, serverValue:99712}] to dev-shard-00-02.uvwhr.mongodb.net:27017 (org.mongodb.driver.connection:71) 3 [2021-12-14 15:31:18,773] INFO [mongo-source|task-0] Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203) 4 [2021-12-14 15:31:18,773] INFO [mongo-source|task-0] WorkerSourceTask{id=mongo-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233) 5 [2021-12-14 15:31:27,671] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505 6 [2021-12-14 15:31:37,673] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
Neste artigo de instruções, abordei os fundamentos da criação de uma integração simples, mas poderosa, do MongoDB Atlas com clusters Kafka usando o MongoDB Kafka Connector e o Kafka Connect.
Esse deve ser um bom ponto de partida para você começar com sua próxima pilha de aplicativos orientada a eventos e uma integração bem-sucedida entre o MongoDB e o Kafka.