Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Using the Confluent Cloud With Atlas Stream Processing

Robert Walters5 min read • Published Nov 19, 2024 • Updated Nov 19, 2024
Processamento de streamAtlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Atlas Stream Processing is generally available!
O Apache Kafka é uma plataforma de streaming muito popular atualmente. Ele está disponível na comunidade de código aberto e também como software (por exemplo, Plataforma Confluent) para autogerenciamento. Além disso, você pode obter um serviço Kafka hospedado (ou compatível com Kafka) de vários provedores, incluindo AWS Managed Streaming for Apache Kafka (MSK), RedPanda Cloud e Confluent Cloud, para nomear alguns.
Neste tutorial, configuraremos a conectividade de rede entre instâncias do MongoDB Atlas Stream Processing e um tópico dentro da cloud Confluent. Ao final deste tutorial, você poderá processar eventos de stream de tópicos do Confluent Cloud e emitir os resultados de volta para um tópico do Confluent Cloud.
Os clusters dedicados da Confluent Cloud oferecem suporte à conectividade por meio de endpoints públicos seguros da Internet com seus clusters Basic e Standard. Opções de conectividade de rede privada, como conexões Private Link, emparelhamento VPC/VNet e AWS Transit Gateway, estão disponíveis nas camadas do cluster Enterprise e Dedicated.
Observação: no momento em que este artigo foi escrito, o Atlas Stream Processing era compatível apenas com clusters de cloud básica e padrão voltados para a Internet. Esta publicação será atualizada para acomodar clusters Enterprise e Dedicados quando for fornecido suporte para redes privadas.
A maneira mais fácil de começar a usar a conectividade entre o Confluent Cloud e o MongoDB Atlas é usando endpoints públicos da Internet. A conectividade pública com a Internet é a única opção para os clusters Básico e Padrão do Confluent. Tenha certeza de que os clusters da Confluent Cloud com endpoints de internet são protegidos por uma camada de proxy que evita tipos de DoS, DDoS, SYN overflow e outros ataques no nível da rede. Também usaremos chaves de API de autenticação com o método de autenticação SASL_SSL para troca segura de credenciais.
Neste tutorial, configuraremos e configuraremos o Confluent Cloud e o MongoDB Atlas para conectividade de rede e, em seguida, trabalharemos com um exemplo simples que usa um gerador de dados de amostra para transmitir dados entre o MongoDB Atlas e o Confluent Cloud.

Pré-requisitos do tutorial

Isto é o que você precisa acompanhar:
  • Um projeto Atlas (nível gratuito ou pago)
  • Um usuário do banco de dados Atlas com permissão atlasAdmin
    • Para os fins deste tutorial, teremos o usuário "tutorialuser. "
  • Shell do MongoDB (Mongosh) versão 2.0+
  • Cluster Confluent Cloud (qualquer configuração)

Configurar Nuvem Confluente

Para este tutorial, você precisa de um cluster do Confluent Cloud criado com um tópico, "solardata, " e uma chave de acesso à API criada. Se você já tiver isso, pode pular para a Etapa 2.
Para criar um cluster do Confluent Cloud, faça login no portal do Confluent Cloud, selecione ou crie um ambiente para seu cluster e clique no botão “Add Cluster”.
Neste tutorial, podemos usar um tipo de clusterBasic.
Criar cluster do Confluent Cloud
Após a criação do cluster, crie uma chave de API clicando no menu "API Keys " na visão geral do cluster, no lado esquerdo da página.
Caixa de diálogo Chaves API do Confluent Cloud
Clique em “Create Key” e forneça uma descrição para seu par de chaves, conforme mostrado abaixo.
Caixa de diálogo de chave do Confluent Cloud Create
Anote a chave e a senha da API antes de fazer o download e continuar. Você precisará deles ao criar a conexão no Atlas Stream Processing. Observe que o OAuth confluente e o Logon único confluente não são compatíveis como métodos de autenticação no Atlas Stream Processing.
Em seguida, crie um tópico clicando no item de menu "Topics " e, em seguida, no botão "Add topic ". Aceite as configurações padrão e dê um nome ao tópico: “solardata.: Agora estamos prontos para configurar o MongoDB Atlas Stream Processing.

Configurar Registro de Conexão do Atlas Stream Processing

No MongoDB Atlas, clique em "Stream Processing" no menu Serviços. Em seguida, clique no botão "Create Instance". Forneça um nome, provedor de nuvem e região. Observação: para obter um custo de rede menor, escolha o provedor de nuvem e a região que correspondam ao seu cluster do Confluent Cloud. Neste tutorial, usaremos o Amazon Web Services us-east-1 para o Confluent Cloud e o MongoDB Atlas.
O dashboard Instância do processador de fluxo do Atlas Stream Processing
Depois que a Instância de Processamento de Fluxo (SPI) é criada, podemos criar nossa conexão com o Confluent Cloud usando o Registro de Conexão. Clique em "Configure, " e, em seguida, clique na aba "Connection Registry ", conforme mostrado abaixo.
Configurar o dashboard da Instância do Processador de Stream do Atlas
Para criar a conexão com o Confluent Cloud, clique em "Add Connection. "
Selecione, "Kafka " e insira "confluentcloud " para o nome da conexão. Preencha os detalhes a seguir a partir das informações em seu cluster do Confluent Cloud.
  • Servidor Bootstrap: fornecido no Confluent Cloud em Configurações de cluster/Pontos de conexão
  • Protocolo de segurança: SASL_SSL
  • Mecanismo SASL: PLAIN
  • Nome de usuário: cole na chave de API
  • Senha: Cole o SEGREDO da API
Um exemplo de diálogo de adicionar conexão é mostrado abaixo.
Adicione o diálogo de conexão do registro de conexão
Clique em "Add Connection" e sua nova conexão com o Confluent Cloud aparecerá na lista.
Painel de conexão do Registro de Conexão
Em seguida, crie outra conexão clicando no botão “Add Connection. Desta vez, selecionaremos "Sample Stream " e "sample_stream_solar " no menu suspenso, conforme mostrado abaixo.
Adicionar conexão de transmissão de amostra
Isso disponibilizará um gerador de dados de amostra chamado “sample_stream_solar” em nosso SPI.
Em seguida, vamos testar a conectividade com o Confluent e executar nosso primeiro Atlas Stream Processor com dados do Confluent Cloud.

Crie o processador de fluxo no Atlas

Observação: para se conectar ao SPI, você precisará de um usuário do banco de dados com permissões de Atlas Admin ou de um membro da função Project Owner (Proprietário do projeto). Se você ainda não tiver isso, crie-o agora antes de continuar este tutorial.
As informações de conexão podem ser encontradas clicando noConnect botão " " em seu SPI. A caixa de diálogo de conexão é semelhante à caixa de diálogo de conexão ao se conectar a um cluster Atlas. Para se conectar ao SPI, você precisará usar a ferramenta de linha de comandomongosh .
Caixa de diálogo Conectar para a instância do processador de fluxo
Para se conectar ao SPI, use a connection string fornecida na caixa de diálogo de conexão.
Depois de conectado, você pode enumerar as conexões disponíveis usando o comandosp.listConnections().
1AtlasStreamProcessing> sp.listConnections()
2[
3 { name: 'sample_stream_solar', type: 'inmemory' },
4 { name: 'confluentcloud', type: 'kafka' }
5]
Agora que confirmamos que nossos dados de amostra 'sample_stream_solar' e nosso tópico Kafka 'confluentcloud' estão disponíveis, vamos usar a fonte de amostra solar para criar uma consulta de streaming que calcula a potência média gerada e a grava em um tópico do Kafka “solardata”.
1s_solar={$source: { connectionName: "sample_stream_solar"}}
2
3Twindow = { $tumblingWindow: { interval: { size: NumberInt(30), unit: "second" }, pipeline: [ { $group: { _id: "$device_id", max: { $max: "$obs.watts" }, avg: { $avg: "$obs.watts" } } }] } }
4
5write_kafka={ $emit: { "connectionName": "confluentcloud", "topic" : "solardata"}}
Agora que temos nossas variáveis de pipeline definidas, vamos usar o .process para executar esse processador de stream em primeiro plano.
1AtlasStreamProcessing> sp.process([s_solar,Twindow,write_kafka])
Para ler os dados do tópico, abra outra janela do terminal e conecte-se ao SPI. Defina uma variável para o tópico Kafka, conforme mostrado abaixo.
1s_kafka={$source: { connectionName: "confluentcloud", "topic": "solardata"}}
Em seguida, use o comando.process()para ler os dados do tópico "solardata".
1AtlasStreamProcessing> sp.process([s_solar])
Após cerca de 30 segundos, você verá a saída de dados do tópico 'solardata'.

Encerrando

Neste tutorial, usamos o Atlas Stream Processing para criar um processador de stream com dados de amostra e escrevemos os resultados da agregação em um tópico do Kafka no Confluent Cloud. Também transmitemos dados da Confluent Cloud para o Atlas Stream Processing e confirmamos que os dados transformados foram gravados no tópico. Este tutorial foi feito sem nenhuma configuração de rede extra.
Você deve se lembrar que, por padrão, nenhuma conexão de rede é permitida no Atlas. Os usuários precisam abrir seu cluster para o mundo adicionando 0.0.0.0 ou especifique intervalos de IP específicos. O que é importante observar é que as conexões do Atlas Stream Processing se originam do Atlas e se conectam à nuvem do Confluent. Portanto, não há IP de acesso à rede que precise ser aberto ou IP permitido na lista.
Na Confluent Cloud, não há nenhum conceito de filtragem IP ou lista de permissões de IP. Por esse motivo, não há nada a ser feito a mais no lado do Confluent Cloud com relação à configuração da rede. No momento em que este artigo foi escrito, 2023 de setembro, as opções de rede privada disponíveis no Confluent Cloud, como PrivateLink, não são suportadas no Atlas Stream Processing. Este tutorial será atualizado quando houver suporte para essas opções de rede privada.

Saiba mais sobre o processamento de fluxo do MongoDB Atlas

Para saber mais sobre como gerenciar processadores de stream no Atlas Stream Processing, visite nossa documentação.
Faça login hoje mesmo para começar. O Atlas Stream Processing agora está disponível para todos os desenvolvedores no Atlas. Experimente hoje mesmo!

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Criando aplicações de IA com o Microsoft Semantic Kernel e o MongoDB Atlas Vector Search


Nov 27, 2023 | 8 min read
Início rápido

MongoDB com agente Bedrock: tutorial rápido


Jul 01, 2024 | 6 min read
Tutorial

Como implementar o Agentic RAG usando o Claude 3.5 Sonnet, LlamaIndex e MongoDB


Jul 02, 2024 | 17 min read
Artigo

Integração do Atlas Data Lake SQL para formar interações de dados poderosas


Jun 12, 2023 | 3 min read
Sumário