Using the Confluent Cloud With Atlas Stream Processing
Robert Walters5 min read • Published Nov 19, 2024 • Updated Nov 19, 2024
Avalie esse Tutorial
O Apache Kafka é uma plataforma de streaming muito popular atualmente. Ele está disponível na comunidade de código aberto e também como software (por exemplo, Plataforma Confluent) para autogerenciamento. Além disso, você pode obter um serviço Kafka hospedado (ou compatível com Kafka) de vários provedores, incluindo AWS Managed Streaming for Apache Kafka (MSK), RedPanda Cloud e Confluent Cloud, para nomear alguns.
Neste tutorial, configuraremos a conectividade de rede entre instâncias do MongoDB Atlas Stream Processing e um tópico dentro da cloud Confluent. Ao final deste tutorial, você poderá processar eventos de stream de tópicos do Confluent Cloud e emitir os resultados de volta para um tópico do Confluent Cloud.
Os clusters dedicados da Confluent Cloud oferecem suporte à conectividade por meio de endpoints públicos seguros da Internet com seus clusters Basic e Standard. Opções de conectividade de rede privada, como conexões Private Link, emparelhamento VPC/VNet e AWS Transit Gateway, estão disponíveis nas camadas do cluster Enterprise e Dedicated.
Observação: no momento em que este artigo foi escrito, o Atlas Stream Processing era compatível apenas com clusters de cloud básica e padrão voltados para a Internet. Esta publicação será atualizada para acomodar clusters Enterprise e Dedicados quando for fornecido suporte para redes privadas.
A maneira mais fácil de começar a usar a conectividade entre o Confluent Cloud e o MongoDB Atlas é usando endpoints públicos da Internet. A conectividade pública com a Internet é a única opção para os clusters Básico e Padrão do Confluent. Tenha certeza de que os clusters da Confluent Cloud com endpoints de internet são protegidos por uma camada de proxy que evita tipos de DoS, DDoS, SYN overflow e outros ataques no nível da rede. Também usaremos chaves de API de autenticação com o método de autenticação SASL_SSL para troca segura de credenciais.
Neste tutorial, configuraremos e configuraremos o Confluent Cloud e o MongoDB Atlas para conectividade de rede e, em seguida, trabalharemos com um exemplo simples que usa um gerador de dados de amostra para transmitir dados entre o MongoDB Atlas e o Confluent Cloud.
Isto é o que você precisa acompanhar:
- Um projeto Atlas (nível gratuito ou pago)
- Para os fins deste tutorial, teremos o usuário "tutorialuser. "
- Shell do MongoDB (Mongosh) versão 2.0+
- Cluster Confluent Cloud (qualquer configuração)
Para este tutorial, você precisa de um cluster do Confluent Cloud criado com um tópico, "solardata, " e uma chave de acesso à API criada. Se você já tiver isso, pode pular para a Etapa 2.
Para criar um cluster do Confluent Cloud, faça login no portal do Confluent Cloud, selecione ou crie um ambiente para seu cluster e clique no botão “Add Cluster”.
Neste tutorial, podemos usar um tipo de clusterBasic.
Após a criação do cluster, crie uma chave de API clicando no menu "API Keys " na visão geral do cluster, no lado esquerdo da página.
Clique em “Create Key” e forneça uma descrição para seu par de chaves, conforme mostrado abaixo.
Anote a chave e a senha da API antes de fazer o download e continuar. Você precisará deles ao criar a conexão no Atlas Stream Processing. Observe que o OAuth confluente e o Logon único confluente não são compatíveis como métodos de autenticação no Atlas Stream Processing.
Em seguida, crie um tópico clicando no item de menu "Topics " e, em seguida, no botão "Add topic ". Aceite as configurações padrão e dê um nome ao tópico: “solardata.: Agora estamos prontos para configurar o MongoDB Atlas Stream Processing.
No MongoDB Atlas, clique em "Stream Processing" no menu Serviços. Em seguida, clique no botão "Create Instance". Forneça um nome, provedor de nuvem e região. Observação: para obter um custo de rede menor, escolha o provedor de nuvem e a região que correspondam ao seu cluster do Confluent Cloud. Neste tutorial, usaremos o Amazon Web Services us-east-1 para o Confluent Cloud e o MongoDB Atlas.
Depois que a Instância de Processamento de Fluxo (SPI) é criada, podemos criar nossa conexão com o Confluent Cloud usando o Registro de Conexão. Clique em "Configure, " e, em seguida, clique na aba "Connection Registry ", conforme mostrado abaixo.
Para criar a conexão com o Confluent Cloud, clique em "Add Connection. "
Selecione, "Kafka " e insira "confluentcloud " para o nome da conexão. Preencha os detalhes a seguir a partir das informações em seu cluster do Confluent Cloud.
- Servidor Bootstrap: fornecido no Confluent Cloud em Configurações de cluster/Pontos de conexão
- Protocolo de segurança: SASL_SSL
- Mecanismo SASL: PLAIN
- Nome de usuário: cole na chave de API
- Senha: Cole o SEGREDO da API
Um exemplo de diálogo de adicionar conexão é mostrado abaixo.
Clique em "Add Connection" e sua nova conexão com o Confluent Cloud aparecerá na lista.
Em seguida, crie outra conexão clicando no botão “Add Connection. Desta vez, selecionaremos "Sample Stream " e "sample_stream_solar " no menu suspenso, conforme mostrado abaixo.
Isso disponibilizará um gerador de dados de amostra chamado “sample_stream_solar” em nosso SPI.
Em seguida, vamos testar a conectividade com o Confluent e executar nosso primeiro Atlas Stream Processor com dados do Confluent Cloud.
Observação: para se conectar ao SPI, você precisará de um usuário do banco de dados com permissões de Atlas Admin ou de um membro da função Project Owner (Proprietário do projeto). Se você ainda não tiver isso, crie-o agora antes de continuar este tutorial.
As informações de conexão podem ser encontradas clicando noConnect botão " " em seu SPI. A caixa de diálogo de conexão é semelhante à caixa de diálogo de conexão ao se conectar a um cluster Atlas. Para se conectar ao SPI, você precisará usar a ferramenta de linha de comandomongosh .
Para se conectar ao SPI, use a connection string fornecida na caixa de diálogo de conexão.
Depois de conectado, você pode enumerar as conexões disponíveis usando o comando
sp.listConnections()
.1 AtlasStreamProcessing> sp.listConnections() 2 [ 3 { name: 'sample_stream_solar', type: 'inmemory' }, 4 { name: 'confluentcloud', type: 'kafka' } 5 ]
Agora que confirmamos que nossos dados de amostra 'sample_stream_solar' e nosso tópico Kafka 'confluentcloud' estão disponíveis, vamos usar a fonte de amostra solar para criar uma consulta de streaming que calcula a potência média gerada e a grava em um tópico do Kafka “solardata”.
1 s_solar={$source: { connectionName: "sample_stream_solar"}} 2 3 Twindow = { $tumblingWindow: { interval: { size: NumberInt(30), unit: "second" }, pipeline: [ { $group: { _id: "$device_id", max: { $max: "$obs.watts" }, avg: { $avg: "$obs.watts" } } }] } } 4 5 write_kafka={ $emit: { "connectionName": "confluentcloud", "topic" : "solardata"}}
Agora que temos nossas variáveis de pipeline definidas, vamos usar o
.process
para executar esse processador de stream em primeiro plano.1 AtlasStreamProcessing> sp.process([s_solar,Twindow,write_kafka])
Para ler os dados do tópico, abra outra janela do terminal e conecte-se ao SPI. Defina uma variável para o tópico Kafka, conforme mostrado abaixo.
1 s_kafka={$source: { connectionName: "confluentcloud", "topic": "solardata"}}
Em seguida, use o comando
.process()
para ler os dados do tópico "solardata".1 AtlasStreamProcessing> sp.process([s_solar])
Após cerca de 30 segundos, você verá a saída de dados do tópico 'solardata'.
Neste tutorial, usamos o Atlas Stream Processing para criar um processador de stream com dados de amostra e escrevemos os resultados da agregação em um tópico do Kafka no Confluent Cloud. Também transmitemos dados da Confluent Cloud para o Atlas Stream Processing e confirmamos que os dados transformados foram gravados no tópico. Este tutorial foi feito sem nenhuma configuração de rede extra.
Você deve se lembrar que, por padrão, nenhuma conexão de rede é permitida no Atlas. Os usuários precisam abrir seu cluster para o mundo adicionando 0.0.0.0 ou especifique intervalos de IP específicos. O que é importante observar é que as conexões do Atlas Stream Processing se originam do Atlas e se conectam à nuvem do Confluent. Portanto, não há IP de acesso à rede que precise ser aberto ou IP permitido na lista.
Na Confluent Cloud, não há nenhum conceito de filtragem IP ou lista de permissões de IP. Por esse motivo, não há nada a ser feito a mais no lado do Confluent Cloud com relação à configuração da rede. No momento em que este artigo foi escrito, 2023 de setembro, as opções de rede privada disponíveis no Confluent Cloud, como PrivateLink, não são suportadas no Atlas Stream Processing. Este tutorial será atualizado quando houver suporte para essas opções de rede privada.
Para saber mais sobre como gerenciar processadores de stream no Atlas Stream Processing, visite nossa documentação.
Faça login hoje mesmo para começar. O Atlas Stream Processing agora está disponível para todos os desenvolvedores no Atlas. Experimente hoje mesmo!