Integração do MongoDB com o Amazon Managed Streaming for Apache Kafka (MSK)
Igor Alekseev, Robert Walters7 min read • Published May 06, 2022 • Updated Sep 17, 2024
Avalie esse Tutorial
O Amazon Managed Streaming for Apache Kafka (MSK) é um serviço Apache Kafka totalmente gerenciado e altamente disponível. O MSK facilita a ingestão e o processamento de dados de streaming em tempo real e o uso desses dados facilmente no ecossistema da AWS . Ao implementar rapidamente uma solução Kafka, você gasta menos tempo gerenciando a infraestrutura e mais tempo resolvendo seus problemas de negócios, aumentando drasticamente a produtividade. O MSK também suporta a integração de fontes de dados como o MongoDB por meio do serviçoAWS MSK Connect (Connect). Esse serviço Connect funciona com o connector MongoDB para Apache Kafka, permitindo a você integrar facilmente os dados do MongoDB.
Nesta publicação do blog, mostraremos como configurar o MSK, configurar o MongoDB Connector para o Apache Kafka e criar uma conexão segura de emparelhamento VPC com MSK e um MongoDB Atlas cluster. O processo de alto nível é o seguinte:
- Configurar o Amazon Managed Streaming for Apache Kafka
- Configurar cliente EC2
- Configure um cluster do MongoDB Atlas
- Configurar o Atlas Private Link, incluindo VPC e sub-rede do MSK
- Configurar plug-in no MSK para o connector MongoDB
- Criar tópico no cluster MSK
- Instalar a ferramenta de linha de comando MongoSH no cliente
- Configurar o MongoDB Connector como origem ou coletor
Neste exemplo, teremos duas collection no mesmo cluster MongoDB — a “source” e a “sink.”. Inserimos dados de amostra na collection de origem do cliente, e esses dados serão consumidos pela MSK por meio do connector MongoDB para Apache Kafka executado como um connector MSK. À medida que os dados chegarem ao tópico MSK, outra instância do MongoDB Connector para Apache Kafka gravará os dados na collection “sink” do MongoDB Atlas cluster. Para nos alinharmos às melhores práticas de configuração segura, configuraremos uma conexão emparelhada de rede da AWS entre o MongoDB Atlas cluster e a VPC contendo o MSK e a instância cliente EC2.
Para criar um cluster do Amazon MSK usando o Console de gerenciamento da AWS, faça login no Console de gerenciamento da AWS e abra o console do Amazon MSK.
- Escolha Criar cluster e selecione Criação rápida.
Para o nome do cluster, insira MongoDBMSKCluster.
Para a versão Apache Kafka, selecione um que seja 2.6.2 ou superior.
Para o tipo de corretor, selecione kafla.t3.small.
Na tabela em Todas as configurações do cluster, copie os valores das seguintes configurações e salve-os, pois você precisará deles mais tarde neste blog:
- VPC
- Sub-redes
- Grupos de segurança associados à VPC
- Escolha “Create cluster.”
Em seguida, configuraremos uma instância do EC2 para criar um tópico. É aqui que a coleção de origem do MongoDB Atlas será gravada. Esse cliente também pode ser usado para consultar o tópico MSK e monitorar o fluxo de mensagens da origem para o coletor.
Para criar uma máquina cliente, abra o console do Amazon EC2 em https://console.aws.amazon.com/ec2/.
- Escolha Iniciar instâncias.
- Escolha Selecionar para criar uma instância do Amazon Linux 2 AMI (HVM) - Kernel 5.10, Tipo de volume SSD.
- Escolha o t2.micro tipo de instância marcando a caixa de seleção.
- Escolha Próximo: Configurar detalhes da instância.
- Navegue até a lista Rede e escolha a VPC cujo ID você salvou na etapa anterior.
- Go para a lista Atribuir automaticamente IP público e escolha Habilitar.
- No menu próximo à parte superior, selecione Adicionar tags.
- Insira Nome para a Chave e MongoDBMSKCluster para o Valor.
- Selecione Review and Launch e, em seguida, clique em Launch.
- Escolha Criar um novo par de chaves, digite MongodBMSKKeyPair como nome do par de chaves e escolha Baixar par de chaves. Como alternativa, você pode usar um par de chaves existente, se preferir.
- Inicie a nova instância pressionando Iniciar instâncias.
Em seguida, precisaremos configurar a rede para permitir a conectividade entre a instância do cliente e o cluster MSK.
- Selecione Visualizar instâncias. Em seguida, na coluna Security Groups, escolha o grupo de segurança que está associado à instância MSKTutorialClient.
- Copie o nome do grupo de segurança e salve-o para mais tarde.
- No painel de navegação, clique em Grupos de segurança. Encontre o grupo de segurança cujo ID você salvou na Etapa 1 (Criar um cluster Amazon MSK). Escolha esta linha marcando a caixa de seleção na primeira coluna.
- Na guia Regras de entrada, escolha Editar regras de entrada.
- Escolha Adicionar regra.
- Na nova regra, escolha Todo o tráfego na coluna Tipo. No segundo campo da coluna Origem, selecione o grupo de segurança da máquina cliente. Este é o grupo cujo nome você salvou anteriormente nesta etapa.
- Clique em Salvar regras.
O grupo de segurança do cluster agora pode aceitar o tráfego proveniente do grupo de segurança da máquina cliente.
Para criar um cluster do MongoDB Atlas, siga o tutorial Introdução ao Atlas. Observe que, neste blog, você precisará criar um cluster M30 do Atlas ou superior, pois o emparelhamento VPC não está disponível para clusters M0, M2 e M5 .
Após a criação do cluster, configure um endpoint privado da AWS na UI do Atlas Network Access, fornecendo as mesmas sub-redes e VPC.
- Clique em Acesso à rede.
- Clique em Private Endpoint e, em seguida, no botão Add Private Endpoint.
- Preencha os IDs de VPC e sub-rede da seção anterior.
- Conecte-se por SSH à máquina cliente criada anteriormente e emita o seguinte comando no portal do Atlas: **aws ec2 create-vpc-endpoint **
- Observe que talvez seja necessário primeiro configurar o comando da AWS CLI usando o aws configure antes de criar a VPC por meio dessa ferramenta. Consulte Fundamentos de configuração para obter mais informações.
A seguir, precisamos criar um plugin personalizado para MSK. Este plugin personalizado será o MongoDB Connector para Apache Kafka. Para referência, observe que o connector precisará ser carregado em um repositório S3 antes de você poder criar o plug-in. Você pode baixar o MongoDB Connector para Apache Kafka em Confluent Hub.
- Selecione "Create custom plugin " no menu Plug-ins personalizados no MSK.
- Preencha o formulário de plugin -in personalizado, incluindo a localização S3 do connector baixado e clique em "Create custom plugin."
Quando iniciamos a leitura dos dados do MongoDB, também precisamos criar um tópico no MSK para aceitar os dados. Vamos instalar o Apache Kafka na instância do EC2 do cliente, que inclui algumas ferramentas básicas.
Para começar, execute o seguinte comando para instalar o Java:
sudo yum install java-1.8.0
Em seguida, execute o comando abaixo para baixar o Apache Kafka.
Com base na etapa anterior, execute este comando no diretório em que você baixou o arquivo TAR:
tar -xzf kafka_2.12-2.6.2.tgz
A distribuição do Kafka inclui uma pastabin com ferramentas que podem ser usadas para gerenciar tópicos. Go para o kafka_2.12-2.6.Diretório 2.
Para criar o tópico que será usado para escrever eventos MongoDB, emita este comando:
bin/kafka-topics.sh --create --zookeeper (INSERT YOUR ZOOKEEPER INFO HERE)--replication-factor 1 --partitions 1 --topic MongoDBMSKDemo.Source
Além disso, lembre-se de que você pode copiar o ponto de extremidade do servidor Zookeeper da página "View Client Information" no Cluster MSK. Neste exemplo, estamos usando texto simples.
Após a criação do plug-in, podemos criar uma instância do connector MongoDB selecionando "Create connector " no menu Conectores.
- Selecione o plug-in MongoDB e clique em "Next. "
- Preencha o formulário da seguinte forma:
Nome do conector: MongoDB Source Connector
Tipo de cluster: connectorMSK
Selecione o cluster MSK que foi criado anteriormente e selecione "None" no menu suspenso de autenticação.
Insira sua configuração do connector (mostrada abaixo) na área de texto de definições de configuração.
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
database=MongoDBMSKDemo
collection=Source
tasks.max=1
connection.uri=(MONGODB CONNECTION STRING HERE)
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Observação: você pode encontrar sua connection string do Atlas clicando no botão Conectar em seu Atlas cluster. Selecione " "Private Endpoint se já tiver configurado o endpoint privado acima e pressione " "Choose a connection method. Em seguida,Connect your application selecione " " e copie a cadeia deconexão mongodb+srv .
Na seção "Access Permissions", você precisará criar uma função IAM com a política de confiança necessária.
Feito isso, clique em "Next." A última seção oferecerá a capacidade de usar logs - o que é altamente recomendável, pois simplificará o processo de solução de problemas.
Agora que temos o connector de origem instalado e funcionando, vamos configurar um connector de coletor para concluir a viagem de ida e volta. Crie outra instância do MongoDB connector selecionando "Create connector " no menu Connectors.
Selecione o mesmo plugin que foi criado anteriormente e preencha o formulário da seguinte maneira:
Nome do connector: MongoDB Connector Tipo de cluster: Conector MSK Selecione o cluster MSK que foi criado anteriormente e selecione "None " no menu suspenso de autenticação. Insira a configuração do connector (mostrada abaixo) na área de texto Definições de configuração.
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
database=MongoDBMSKDemo
collection=Sink
tasks.max=1
topics=MongoDBMSKDemo.Source
connection.uri=(MongoDB Atlas Connection String Gos Here)
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Na seção Permissões de acesso, selecione a função IAM criada anteriormente que tem a política de confiança necessária. Assim como no connector anterior, certifique-se de aproveitar um serviço de registro como o CloudWatch.
Depois que o connector for configurado com sucesso, podemos testar a viagem de ida e volta gravando na coleção Source e vendo os mesmos dados na coleção Sink.
Podemos inserir dados de duas maneiras: por meio da interface do usuário intuitiva do Atlas ou com a nova ferramenta de linha de comando MongoSH (mongoshell). Usando o MongoSH, você pode interagir diretamente com um cluster MongoDB para testar queries, executar operações de banco de dados ad-hoc e muito mais.
Para sua referência, adicionamos uma seção sobre como usar o mongoshell na instância do EC2 do seu cliente abaixo.
Na instância do cliente EC2 , crie um /etc/yum.repos.d/mongodb-org-5.0.repo digitando:
sudo nano /etc/yum.repos.d/mongodb-org-5.0.repo
Cole o seguinte:
[mongodb-org-5.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/5.0/x86_64/
gpgcheck=1
enabled=1
gpgkey=[https://www.mongodb.org/static/pgp/server-5.0.asc](https://www.mongodb.org/static/pgp/server-5.0.asc)
Em seguida, instale o shell MongoSH com este comando:
sudo yum install -y mongodb-mongosh
Use o modelo abaixo para se conectar ao seu cluster MongoDB via mongoshell:
mongosh “ (paste in your Atlas connection string here) “
Depois de conectado, digite:
Use MongoDBMSKDemo
db.Source.insertOne({“Testing”:123})
Para verificar os dados na coleção de coletores, use este comando:
db.Sink.find({})
Se você tiver algum problema, verifique os arquivos de log. Neste exemplo, usamos o CloudWatch para ler os eventos gerados pelo MSK e pelo connector MongoDB para Apache Kafka.
O Amazon Managed Streaming for Apache Kafka (MSK) é um serviço Apache Kafka totalmente gerenciado, seguro e altamente disponível que facilita a ingestão e o processamento de dados de streaming em tempo real. O MSK permite importar connectors Kafka, como o MongoDB Connector para Apache Kafka. Esses connectors facilitam o trabalho com fontes de dados no MSK. Neste artigo, você aprendera como configurar o MSK, o MSK Connect e o MongoDB Connector para o Apache Kafka. Você também aprenderam como configurar um cluster do MongoDB Atlas e configurá-lo para utilizar o emparelhamento de rede AWS. Para continuar seu aprendizado, confira os seguintes recursos: