Introdução ao Atlas Stream Processing: como criar seu primeiro processador de fluxo
Robert Walters4 min read • Published Aug 13, 2024 • Updated Aug 13, 2024
Avalie esse Tutorial
Se você ainda não estiver familiarizado, o MongoDB Atlas Stream Processing permite o processamento de streams de alta velocidade de dados complexos usando o mesmo modelo de dados e a mesmaAPI de query usada no MongoDB Atlas. A transmissão de dados é cada vez mais crítica para criarexperiências responsivas e orientadas por eventos para seus clientes. O process
Neste tutorial, criaremos um processador de stream que usa dados de amostra incluídos no Atlas Stream Processing. Até o final do tutorial, você terá uma Instância de Processamento de Fluxo (SPI)operacional configurada com um processador de fluxo. Este ambiente pode ser utilizado para experimentação adicional e tutoriais de Atlas Stream Processing no futuro.
Isto é o que você precisará para acompanhar:
- Um usuário do Atlas com permissão atlasAdmin. Para os fins deste tutorial, teremos o usuário "tutorialuser".
- Shell do MongoDB (Mongosh) versão 2.0+
Vamos primeiro criar uma Atlas Stream Processing. Pense em um SPI como um agrupamento lógico de um ou mais processadores de fluxo. Quando criado, o SPI tem uma connection string semelhante a um típico MongoDB Atlas.
Na aba Serviços no Projeto Atlas, clique em, "Atlas Stream Processing". Em seguida, clique no botão "Criar instância".
Isso iniciará a caixa de diálogo Criar instância.
Insira o provedor de nuvem e a região desejados e clique em " Criar ". Você receberá uma caixa de diálogo de confirmação após a criação bem-sucedida.
O registro de conexão armazena informações de conexão nas fontes de dados externas que você deseja usar em um processador de stream. Neste exemplo, usaremos um gerador de dados de amostra que está disponível sem nenhuma configuração extra, mas normalmente você se conectaria ao banco de dados do Kafka ou do Atlas como fonte.
Para gerenciar o registro da conexão, clique em “Configurar” para navegar até a tela de configuração.
Na tela de configuração, clique na guia "Connection Registry".
Em seguida, clique no botão "Adicionar conexão". Isso iniciará a caixa de diálogo Adicionar conexão.
A partir daqui, você pode adicionar conexões com o Kafka, outros clusters Atlas dentro do projeto ou um fluxo de amostra. Neste tutorial, usaremos a conexão de amostra do Stream. Clique em "Sample Stream" e selecione "sample_stream_solar" na lista de streams de amostra disponíveis. Em seguida, clique em "Adicionar conexão".
A nova "sample_stream_solar" aparecerá na lista de conexões.
Agora que criamos o SPI e configuramos a conexão no registro de conexão, podemos criar um processador de fluxo. Primeiro, precisamos nos conectar ao SPI que criamos anteriormente. Isso pode ser feito usando o MongoDB Shell (mongosh).
Para obter a cadeia de conexão com o SPI, retorne à página principal de processamento de stream clicando no menu "Processamento de stream" na aba Serviços.
Em seguida, localize o SPI "Tutorial" que acabamos de criar e clique no botão "Conectar". Isso apresentará uma caixa de diálogo de conexão semelhante à encontrada ao se conectar a clusters MongoDB Atlas.
Para conectar, precisaremos adicionar um endereço IP de conexão e criar um usuário de banco de dados, se ainda não o tivermos feito.
Então escolheremos nosso método de conexão. Se você ainda não tiver o mongosh instalado, instale-o usando as instruções fornecidas na caixa de diálogo.
Depois que o mongosh estiver instalado, copie a cadeia de conexão da visualização "Tenho o MongoDB Shell instalado" e execute-a em seu terminal.
1 Command Terminal > mongosh <<CONNECTION STRING>> --tls --authenticationDatabase admin --username tutorialuser 2 3 Enter password: ******************* 4 5 Current Mongosh Log ID: 64e9e3bf025581952de31587 6 Connecting to: mongodb://***** 7 Using MongoDB: 6.2.0 8 Using Mongosh: 2.0.0 9 10 For mongosh info see: https://docs.mongodb.com/mongodb-shell/ 11 12 AtlasStreamProcessing>
Para confirmar que sample_stream_solar foi adicionado como uma conexão, emita
sp.listConnections()
. Nossa conexão com sample_stream_solar é mostrada como esperado.1 AtlasStreamProcessing> sp.listConnections() 2 { 3 ok: 1, 4 connections: [ 5 { 6 name: 'sample_stream_solar', 7 type: 'inmemory', 8 createdAt: ISODate("2023-08-26T18:42:48.357Z") 9 } 10 ] 11 }
Se você estiver lendo esta publicação como um pré-requisito para outro tutorial, você pode retornar a esse tutorial agora para continuar.
Nesta seção, concluiremos criando um processador de fluxo simples para processar a fonte sample_stream_solar que usamos ao longo deste tutorial. Esta fonte sample_stream_solar representa a produção de energia observada de diferentes dispositivos (painéis solares únicos). O processamento de fluxo pode ser útil para medir características como a eficiência do painel ou quando a substituição é necessária para um dispositivo que não está mais produzindo energia.
Primeiro, vamos definir um estágio$source para descrever de onde o Atlas Stream Processing lerá os dados de stream.
1 var solarstream={$source:{"connectionName": "sample_stream_solar"}}
Agora, emitiremos arquivos .process para visualizar o conteúdo do stream no console.
sp.process([solarstream])
.process nos permite obter amostras de nossos dados de origem e testar rapidamente os estágios de um processador de stream para garantir que ele esteja configurado conforme o esperado. Uma amostra desses dados é a seguinte:
1 { 2 device_id: 'device_2', 3 group_id: 3, 4 timestamp: '2023-08-27T13:51:53.375+00:00', 5 max_watts: 250, 6 event_type: 0, 7 obs: { 8 watts: 168, 9 temp: 15 10 }, 11 _ts: ISODate("2023-08-27T13:51:53.375Z"), 12 _stream_meta: { 13 sourceType: 'sampleData', 14 timestamp: ISODate("2023-08-27T13:51:53.375Z") 15 } 16 }
Neste tutorial, Começamos introduzindo o Atlas Stream Processing e por que o processamento de fluxo é um bloco de construção para alimentar aplicativos modernos. Em seguida, percorremos os fundamentos da criação de um processador de fluxo - criamos uma Instância de Processamento de Stream, configuramos uma fonte em nosso registro de conexão usando dados solar de amostra (incluídos no Atlas Stream Processing), conectamos a uma Instância de Processamento de Stream e, finalmente, testamos nosso primeiro processador de fluxo usando .process. Agora você está pronto para explorar o Atlas Stream Processing e criar seus próprios processadores de stream, adicionando funcionalidades avançadas como janelamento e validação.
Se você leu este tutorial e gostaria de saber mais, confira a publicação no blog doMongoDB Atlas Stream Processing . Para saber mais sobre processadores de stream no Atlas Stream Processing, acesse nossa documentação.
Para saber mais sobre como gerenciar processadores de stream no Atlas Stream Processing, visite nossa documentação.
Faça login hoje mesmo para começar. O Atlas Stream Processing agora está disponível para todos os desenvolvedores no Atlas. Experimente hoje mesmo!