Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Introdução ao desenvolvimento do Atlas Stream Processing

Hubert Nguyen7 min read • Published Aug 28, 2024 • Updated Aug 28, 2024
Processamento de streamAtlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Bem-vindo a este tutorial de Atlas Stream Processing! Neste guia, configuraremos rapidamente um fluxo de trabalho de codificação e faremos com que você escreva e execute sua primeira instância de Atlas Stream Processing rapidamente. Em muito pouco tempo, aprenderemos como criar uma nova instância de processador de fluxo, codificar e executar processadores de fluxo de maneira conveniente a partir do Visual Studio Code e simplesmente agregar dados de fluxo, abrindo assim a porta para um campo totalmente novo do MongoDB Atlas. plataforma de dados.

O que abordaremos

  • Pré-requisitos
  • Configurar
  • Criar uma instância do processador de fluxo
  • Configurar o Visual Studio Code
  • A Anatomia de um Processador de Stream
  • Vamos executar um processador de fluxo!
  • Dados codificados na declaração $source
  • Processador de fluxo mais simples
  • Agregação de processamento de fluxo
  • Adicionar carimbos de data/hora aos dados

Pré-requisitos

Configurar

Crie uma instância de processamento de fluxo do Atlas

Precisamos ter uma Atlas Stream Processing Instance (SPI) pronta. Siga as etapas do tutorial Introdução ao Atlas Stream Processing: criando seu primeiro processador de streams até que tenhamos nossa connection string e nome de usuário/senha. Em seguida, volte aqui.
Não se trata de adicionar seu endereço IP ao Atlas Network Access para permitir que o cliente acesse a instância.

Configurar o código do Visual Studio para processamento de fluxo do MongoDB Atlas

Graças à extensão MongoDB para VS Code, podemos desenvolver rapidamente pipelines de agregação de processamento de fluxo (SP) e executá-los diretamente de dentro de um playground do VS Code MongoDB. Isso proporciona uma experiência muito melhor para o desenvolvedor. No restante deste artigo, usaremos o VS Code.
Esse playground é um ambiente NodeJS onde podemos executar JavaScript interagindo com um processador de transmissão ao vivo no MongoDB Atlas. Para começar, instale oVS Code e a MongoDB for VS Code.
Abaixo está um ótimo tutorial sobre a instalação da extensão. Ele também lista alguns comandos de shell de que precisaremos mais tarde.
Conectando um Atlas Stream Processor no Visual Studio Code

A Anatomia de um Processador de Stream 

Em uma instância de processamento de fluxo (=cluster), é possível criar e executar vários processadores de fluxo (=pipelines de processamento de fluxo) simultaneamente.
Um processador de fluxo único (SP) é muito semelhante a um pipeline de agregação do MongoDB. Ele é descrito por uma matriz de estágios de processamento. Entretanto, há algumas diferenças. O SP mais básico pode ser criado usando apenas sua fonte de dados (teremos exemplos executáveis a seguir).
1// our array of stages
2// source is defined earlier
3sp_aggregation_pipeline = [source]
4sp.createStreamProcessor("SP_NAME", sp_aggregation_pipeline, <OPTIONS>)
Um processador de fluxo mais realista conteria pelo menos um estágio de agregação, e pode haver um grande número de estágios executando várias operações no fluxo de dados de entrada. Há um limite gratuito de 16MB para o tamanho total do processador.
1sp_aggregation_pipeline = [source, stage_1, stage_2...]
2sp.createStreamProcessor("SP_NAME", sp_aggregation_pipeline, <OPTIONS>)
Para aumentar a velocidade do loop de desenvolvimento, há uma função sp.process() que inicia um processador de fluxo efêmero que não persistirá em sua instância de processamento de fluxo.

Vamos executar um processador de fluxo!

Vamos criar processadores de fluxo básicos e construir nosso caminho. Primeiro, precisamos ter alguns dados! O Atlas Stream Processing oferece suporte a várias fontes de dados para eventos de streaming recebidos. Essas fontes incluem:
  • Declaração de dados codificados em $source.
  • Fluxos do Kafka.
  • MongoDB Atlas databases.

Dados codificados na declaração $source

Para testes rápidos ou exemplos independentes, ter um pequeno conjunto de dados codificados é uma maneira muito conveniente de produzir eventos. Podemos declarar uma matriz de eventos. Aqui está um exemplo extremamente simples e observe que faremos alguns ajustes posteriormente para cobrir diferentes casos de uso.

Processador de fluxo mais simples

No VS Code, executamos um processador de fluxo efêmero com sp.process(). Dessa forma, não precisamos usarsp.createStreamProcessor() esp.
.drop() constantemente, como faria com os SPs que deveriam ser salvos permanentemente na instância.
1src_hard_coded = {
2  $source: {
3    // our hard-coded dataset
4    documents: [
5      {'id': 'entity_1', 'value': 1},
6      {'id': 'entity_1', 'value': 3},
7      {'id': 'entity_2', 'value': 7},
8      {'id': 'entity_1', 'value': 4},
9      {'id': 'entity_2', 'value': 1}
10     ]
11    }
12  }
13sp.process( [src_hard_coded] );
Ao executar este playground, devemos ver os dados aparecendo na aba "OUTPUT" do VS Code (CTRL+SHIFT+U para fazer com que apareçam)
Observação: pode levar alguns segundos para que o SP seja carregado e executado, portanto, não espere uma saída imediata.
1{
2  id: 'entity_1',
3  value: 1,
4  _ts: 2024-02-14T18:52:33.704Z,
5  _stream_meta: { timestamp: 2024-02-14T18:52:33.704Z }
6}
7{
8  id: 'entity_1',
9  value: 3,
10  _ts: 2024-02-14T18:52:33.704Z,
11  _stream_meta: { timestamp: 2024-02-14T18:52:33.704Z }
12}
13...
Este SP simples pode ser usado para garantir que os dados cheguem ao SP e que não haja problemas up stream com nossa fonte. Os dados de registros de data e hora foram gerados no momento da ingestão.

Agregação de processamento de fluxo

Com base no que temos, é fácil adicionar um pipeline de agregação simples ao nosso SP. Abaixo, estamos adicionando um estágio $group para agregar/acumular o campo " value " das mensagens recebidas em uma matriz para o intervalo solicitado.
Observe que o estágio "w" (w significa "Window") do pipeline SP contém um pipeline de agregação interno. Com o processamento de fluxo, temos pipelines de agregação no pipeline de processamento de fluxo.
Esse estágio apresenta um $tumblingWindow que define o período de tempo em que a agregação será executada. Lembre-se de que os fluxos devem ser contínuos, portanto, uma janela é semelhante a um buffer.
intervalo define a duração de tempo de uma janela. Como a janela é um fluxo contínuo de dados, só podemos agregar em uma fatia de cada vez.
idleTimeout define por quanto tempo o $source pode permanecer ocioso antes de fechar a janela. Isso é útil se o fluxo não for sustentado.
1src_hard_coded = {
2  $source: {
3    // our hard-coded dataset
4    documents: [
5      {'id': 'entity_1', 'value': 1},
6      {'id': 'entity_1', 'value': 3},
7      {'id': 'entity_2', 'value': 7},
8      {'id': 'entity_1', 'value': 4},
9      {'id': 'entity_2', 'value': 1}
10     ]
11    }
12  }
13
14w = {
15  $tumblingWindow: {
16    // This is the slice of time we want to look at every iteration
17    interval: {size: NumberInt(2), unit: "second"},
18    // If no additional data is coming in, idleTimeout defines when the window is forced to close
19    idleTimeout : {size: NumberInt(2), unit: "second"},
20    "pipeline": [
21      {
22        '$group': {
23            '_id': '$id',
24            'values': { '$push': "$value" }
25          }
26        }
27      ]
28    }
29  }
30sp_pipeline = [src_hard_coded, w];
31sp.process( sp_pipeline );
Deixe-o rodar por alguns segundos e devemos obter uma saída semelhante à seguinte. $group criará um documento por campo " id " de entrada e agregará os valores relevantes em um novo campo de matriz, " values. "
1{
2  _id: 'entity_2',
3  values: [ 7, 1 ],
4  _stream_meta: {
5    windowStartTimestamp: 2024-02-14T19:29:46.000Z,
6    windowEndTimestamp: 2024-02-14T19:29:48.000Z
7  }
8}
9{
10  _id: 'entity_1',
11  values: [ 1, 3, 4 ],
12  _stream_meta: {
13    windowStartTimestamp: 2024-02-14T19:29:46.000Z,
14    windowEndTimestamp: 2024-02-14T19:29:48.000Z
15  }
16}
Dependendo das configurações de $tumblingWindow, a agregação produzirá vários documentos que correspondam aos carimbos de data/hora. Por exemplo, essas configurações...
1...
2$tumblingWindow: {
3    interval: {size: NumberInt(10), unit: "second"},
4    idleTimeout : {size: NumberInt(10), unit: "second"},
5...
... produzirá o seguinte resultado de agregação:
1{
2  _id: 'entity_1',
3  values: [ 1 ],
4  _stream_meta: {
5    windowStartTimestamp: 2024-02-13T14:51:30.000Z,
6    windowEndTimestamp: 2024-02-13T14:51:40.000Z
7  }
8}
9{
10  _id: 'entity_1',
11  values: [ 3, 4 ],
12  _stream_meta: {
13    windowStartTimestamp: 2024-02-13T14:51:40.000Z,
14    windowEndTimestamp: 2024-02-13T14:51:50.000Z
15  }
16}
17{
18  _id: 'entity_2',
19  values: [ 7, 1 ],
20  _stream_meta: {
21    windowStartTimestamp: 2024-02-13T14:51:40.000Z,
22    windowEndTimestamp: 2024-02-13T14:51:50.000Z
23  }
24}
Veja como os campos windowStartTimestamp e windowEndTimestamp mostram os intervalos de 10segundos conforme solicitado (14:51:30 a 14:51:40 etc.).

Recursos adicionais de aprendizado: construindo agregações

Atlas Stream Processing usa a API de query MongoDB. Você pode saber mais sobre a API de query do MongoDB com a documentação oficial da API de query , o curso interativo[grátis] e o tutorial.
Importante: os pipelines de agregação de processamento de fluxo não oferecem suporte a todas as operações de agregação de banco de dados e possuem operadores adicionais específicos para streaming, como $tblingWindow. Verifique a documentação oficial de aggregation do Stream Processing.

Adicionar carimbos de data/hora aos dados

Mesmo quando codificamos os dados, há uma oportunidade de fornecer um registro de data e hora caso queiramos realizar operações de $sort e imitar melhor um caso de uso real. Isso seria o equivalente a um registro de data e hora do evento incorporado à mensagem.
Existem muitos outros tipos de carimbos de data/hora se usarmos um stream Kafka ao vivo (atribuído pelo produtor, do lado do servidor, tempo de ingestão e muito mais). Adicione um carimbo de data/hora às nossas mensagens e use a propriedade "timeField" do documento para torná-lo o carimbo de data/hora do fluxo oficial.
1src_hard_coded = {
2  $source: {
3    // define our event "timestamp_gps" as the _ts
4    timeField: { '$dateFromString': { dateString: '$timestamp_msg' } },
5    // our hard-coded dataset
6    documents: [
7      {'id': 'entity_1', 'value': 1, 'timestamp_msg': '2024-02-13T14:51:39.402336'},
8      {'id': 'entity_1', 'value': 3, 'timestamp_msg': '2024-02-13T14:51:41.402674'},
9      {'id': 'entity_2', 'value': 7, 'timestamp_msg': '2024-02-13T14:51:43.402933'},
10      {'id': 'entity_1', 'value': 4, 'timestamp_msg': '2024-02-13T14:51:45.403352'},
11      {'id': 'entity_2', 'value': 1, 'timestamp_msg': '2024-02-13T14:51:47.403752'}
12     ]
13    }
14  }
Neste ponto, temos tudo o que precisamos para testar novos pipelines e criar provas de conceito de uma forma conveniente e independente. Em um artigo subsequente, demonstraremos como se conectar a várias fontes de streaming.

Dicas e truques

No momento da publicação da publicação, o Atlas Stream Processing está em pré-visualização pública e há uma série de limitações conhecidas do Stream Processing que você deve estar ciente, como disponibilidade do centro de dados regional, conectividade com outros projetos do Atlas e privilégios de usuário.
Ao executar um processador de fluxo efêmero via sp.process(), muitos erros (problema de serialização de JSON, dados atrasados, divisão por zero, erros de $validate) que poderiam ter ido para uma fila de letras mortas (DLQ) são enviados para a saída padrão para ajudá-lo a depurar.
Para SPs criados com max.createStreamProcessor(), você terá que configurar seu DLQ manualmente. Consulte a documentação para isso. Na página de documentação "Gerenciar o processador de fluxo", procure por "Definir um DLQ".
Após mesclar dados em um banco de dados Atlas, é possível usar as ferramentas de construção de agregação de pipeline existentes no construtor da GUI do Atlas ou MongoDB Compass para criar e depurar pipelines. Como essas ferramentas são destinadas à API do banco de dados principal, lembre-se de que alguns operadores não são suportados pelos processadores de stream, e recursos de streaming, como janelas, não estão disponíveis no momento.

Conclusão

Com isso, você deve ter tudo o que precisa para colocar seu primeiro processador de stream em funcionamento. Em uma postagem futura, entraremos em mais detalhes na conexão a diferentes fontes de dados para seus processadores de stream.
Se você tiver alguma dúvida, compartilhe-a em nosso fórum da comunidade, encontre-se conosco durante os Grupos de usuários locais do MongoDB (MEGs) ou confira um de nossos eventoslocais do MongoDB .

Referências

Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Iniciar a conversa

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Início rápido

Instâncias sem servidor do MongoDB Atlas: início rápido


Aug 13, 2024 | 4 min read
Tutorial

Aprimoramento da precisão do LLM usando o Atlas Vector Search do MongoDB e os metadados Unstructured.io


Dec 04, 2023 | 12 min read
Artigo

Implementação de pipelines RAG robustos: integração do Gemma 2 do Google (2B) técnicas de avaliação do MongoDB e LLM


Sep 12, 2024 | 20 min read
Tutorial

Introdução ao MongoDB e C


Sep 17, 2024 | 12 min read
Sumário