Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Introdução ao Atlas Stream Processing: como criar seu primeiro processador de fluxo

Robert Walters4 min read • Published Aug 13, 2024 • Updated Aug 13, 2024
AtlasProcessamento de stream
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Atlas Stream Processing está agora disponível. Saiba mais sobre isso aqui.
Se você ainda não estiver familiarizado, o MongoDB Atlas Stream Processing permite o processamento de streams de alta velocidade de dados complexos usando o mesmo modelo de dados e a mesmaAPI de query usada no MongoDB Atlas. A transmissão de dados é cada vez mais crítica para criarexperiências responsivas e orientadas por eventos para seus clientes. O process
Neste tutorial, criaremos um processador de stream que usa dados de amostra incluídos no Atlas Stream Processing. Até o final do tutorial, você terá uma Instância de Processamento de Fluxo (SPI)operacional configurada com um processador de fluxo. Este ambiente pode ser utilizado para experimentação adicional e tutoriais de Atlas Stream Processing no futuro.

Pré-requisitos do tutorial

Isto é o que você precisará para acompanhar:
  • Um usuário do Atlas com permissão atlasAdmin. Para os fins deste tutorial, teremos o usuário "tutorialuser".
  • Shell do MongoDB (Mongosh) versão 2.0+

Crie a instância de processamento de fluxo

Vamos primeiro criar uma Atlas Stream Processing. Pense em um SPI como um agrupamento lógico de um ou mais processadores de fluxo. Quando criado, o SPI tem uma connection string semelhante a um típico MongoDB Atlas.
Na aba Serviços no Projeto Atlas, clique em, "Atlas Stream Processing". Em seguida, clique no botão "Criar instância".
Botão de instância de processamento de fluxo
Isso iniciará a caixa de diálogo Criar instância.
Crie a caixa de diálogo da instância de processamento de fluxo na IU do Atlas
Insira o provedor de nuvem e a região desejados e clique em " Criar ". Você receberá uma caixa de diálogo de confirmação após a criação bem-sucedida.

Configurar o registro de conexão

O registro de conexão armazena informações de conexão nas fontes de dados externas que você deseja usar em um processador de stream. Neste exemplo, usaremos um gerador de dados de amostra que está disponível sem nenhuma configuração extra, mas normalmente você se conectaria ao banco de dados do Kafka ou do Atlas como fonte.
Para gerenciar o registro da conexão, clique em “Configurar” para navegar até a tela de configuração.
A página Atlas Stream Processing estará vazia até que o primeiro processador de stream seja criado
Na tela de configuração, clique na guia "Connection Registry". Tela de configuração com a página do Atlas Stream Processing no Atlas
Em seguida, clique no botão "Adicionar conexão". Isso iniciará a caixa de diálogo Adicionar conexão. Adicionar tela de Conexão na página de Atlas Stream Processing
A partir daqui, você pode adicionar conexões com o Kafka, outros clusters Atlas dentro do projeto ou um fluxo de amostra. Neste tutorial, usaremos a conexão de amostra do Stream. Clique em "Sample Stream" e selecione "sample_stream_solar" na lista de streams de amostra disponíveis. Em seguida, clique em "Adicionar conexão".
A nova "sample_stream_solar" aparecerá na lista de conexões. Uma lista dos seus nomes e tipos de conexão fica visível na página Atlas Stream Processing

Conecte-se à instância de processamento de fluxo (SPI)

Agora que criamos o SPI e configuramos a conexão no registro de conexão, podemos criar um processador de fluxo. Primeiro, precisamos nos conectar ao SPI que criamos anteriormente. Isso pode ser feito usando o MongoDB Shell (mongosh).
Para obter a cadeia de conexão com o SPI, retorne à página principal de processamento de stream clicando no menu "Processamento de stream" na aba Serviços.
Navegue até Stream Processing dentro do painel Services no Atlas
Em seguida, localize o SPI "Tutorial" que acabamos de criar e clique no botão "Conectar". Isso apresentará uma caixa de diálogo de conexão semelhante à encontrada ao se conectar a clusters MongoDB Atlas.
Na interface do usuário do Atlas, cada instância do Atlas Stream Processing está visível para os usuários conectarem e configurarem
Para conectar, precisaremos adicionar um endereço IP de conexão e criar um usuário de banco de dados, se ainda não o tivermos feito.
A caixa de diálogo de conexão de uma instância de processamento de stream passa pela configuração da segurança da conexão, pela escolha de um método de conexão e, por fim, pela conexão com o SPI
Então escolheremos nosso método de conexão. Se você ainda não tiver o mongosh instalado, instale-o usando as instruções fornecidas na caixa de diálogo.
Transmitir tela de conexão do processador no Atlas
Depois que o mongosh estiver instalado, copie a cadeia de conexão da visualização "Tenho o MongoDB Shell instalado" e execute-a em seu terminal.
1Command Terminal > mongosh <<CONNECTION STRING>> --tls --authenticationDatabase admin --username tutorialuser
2
3Enter password: *******************
4
5Current Mongosh Log ID: 64e9e3bf025581952de31587
6Connecting to: mongodb://*****
7Using MongoDB: 6.2.0
8Using Mongosh: 2.0.0
9
10For mongosh info see: https://docs.mongodb.com/mongodb-shell/
11
12AtlasStreamProcessing>
Para confirmar que sample_stream_solar foi adicionado como uma conexão, emita sp.listConnections(). Nossa conexão com sample_stream_solar é mostrada como esperado.
1AtlasStreamProcessing> sp.listConnections()
2{
3 ok: 1,
4 connections: [
5 {
6 name: 'sample_stream_solar',
7 type: 'inmemory',
8 createdAt: ISODate("2023-08-26T18:42:48.357Z")
9 }
10 ]
11}

Crie um processador de fluxo

Se você estiver lendo esta publicação como um pré-requisito para outro tutorial, você pode retornar a esse tutorial agora para continuar.
Nesta seção, concluiremos criando um processador de fluxo simples para processar a fonte sample_stream_solar que usamos ao longo deste tutorial. Esta fonte sample_stream_solar representa a produção de energia observada de diferentes dispositivos (painéis solares únicos). O processamento de fluxo pode ser útil para medir características como a eficiência do painel ou quando a substituição é necessária para um dispositivo que não está mais produzindo energia.
Primeiro, vamos definir um estágio$source para descrever de onde o Atlas Stream Processing lerá os dados de stream.
1var solarstream={$source:{"connectionName": "sample_stream_solar"}}
Agora, emitiremos arquivos .process para visualizar o conteúdo do stream no console. sp.process([solarstream])
.process nos permite obter amostras de nossos dados de origem e testar rapidamente os estágios de um processador de stream para garantir que ele esteja configurado conforme o esperado. Uma amostra desses dados é a seguinte:
1{
2 device_id: 'device_2',
3 group_id: 3,
4 timestamp: '2023-08-27T13:51:53.375+00:00',
5 max_watts: 250,
6 event_type: 0,
7 obs: {
8 watts: 168,
9 temp: 15
10 },
11 _ts: ISODate("2023-08-27T13:51:53.375Z"),
12 _stream_meta: {
13 sourceType: 'sampleData',
14 timestamp: ISODate("2023-08-27T13:51:53.375Z")
15 }
16}

Encerrando

Neste tutorial, Começamos introduzindo o Atlas Stream Processing e por que o processamento de fluxo é um bloco de construção para alimentar aplicativos modernos. Em seguida, percorremos os fundamentos da criação de um processador de fluxo - criamos uma Instância de Processamento de Stream, configuramos uma fonte em nosso registro de conexão usando dados solar de amostra (incluídos no Atlas Stream Processing), conectamos a uma Instância de Processamento de Stream e, finalmente, testamos nosso primeiro processador de fluxo usando .process. Agora você está pronto para explorar o Atlas Stream Processing e criar seus próprios processadores de stream, adicionando funcionalidades avançadas como janelamento e validação.
Se você leu este tutorial e gostaria de saber mais, confira a publicação no blog doMongoDB Atlas Stream Processing . Para saber mais sobre processadores de stream no Atlas Stream Processing, acesse nossa documentação.

Saiba mais sobre o processamento de fluxo do MongoDB Atlas

Para saber mais sobre como gerenciar processadores de stream no Atlas Stream Processing, visite nossa documentação.
Faça login hoje mesmo para começar. O Atlas Stream Processing agora está disponível para todos os desenvolvedores no Atlas. Experimente hoje mesmo!

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Artigo

Pesquisa entre clusters usando Atlas Search e Data Federation


Jul 22, 2022 | 3 min read
Tutorial

Sincronize seu aplicativo móvel com o MongoDB Atlas e o Google Cloud MySQL


Feb 08, 2024 | 6 min read
Notícias e Anúncios

Descontinuando o MongoDB Atlas GraphQL e serviços de hospedagem


Mar 12, 2024 | 2 min read
Tutorial

Trabalhando com MongoDB Charts e o novo SDK para JavaScript


Apr 02, 2024 | 10 min read
Sumário