Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Explorando operadores de janela no processamento de fluxo Atlas

Robert Walters4 min read • Published Aug 13, 2024 • Updated Aug 13, 2024
Processamento de streamAtlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Atlas Stream Processing agora está disponível ao público em geral! Saiba mais sobre isso aqui.
Em nossa postagem anterior sobre janelamento, apresentamos os operadores de janela disponíveis no Atlas Stream Processing. Os operadores de janela são uma das operações mais comumente usadas para processar dados de streaming de forma eficaz. O Atlas Stream Processing fornece dois operadores de janela: $tblingWindow e $hoppingWindow. Neste tutorial, exploraremos ambos os operadores usando o gerador de dados solar de amostra fornecido no Atlas Stream Processing.

Começar

Antes de começarmos a criar processadores de fluxo, verifique se você tem um usuário do bancode dados que tenha acesso "atlasAdmin " ao Atlas. Além disso, se você ainda não tiver uma Instância de Atlas Stream Processing criada com uma conexão com o gerador de dados sample_stream_solar, siga as instruções em Introdução ao Atlas Stream Processing: Criando Seu Primeiro Processador de Stream e continue.

Visualize os dados de amostra do fluxo solar

Para este tutorial, usaremos o shell do MongoDB.
Primeiro, confirme se sample_stream_solar foi adicionado como uma conexão, emitindo sp.listConnections().
1AtlasStreamProcessing> sp.listConnections()
2{
3 ok: 1,
4 connections: [
5 {
6 name: 'sample_stream_solar',
7 type: 'inmemory',
8 createdAt: ISODate("2023-08-26T18:42:48.357Z")
9 }
10 ]
11}
Em seguida, vamos definir um estágio$source para descrever de onde o Atlas Stream Processing lerá os dados de stream.
1var solarstream={ $source: { "connectionName": "sample_stream_solar" } }
Em seguida, emita um comando.process para visualizar o conteúdo do stream no console.
1sp.process([solarstream])
Você verá o fluxo de dados solar impresso no console. Uma amostra desses dados é a seguinte:
1{
2 device_id: 'device_2',
3 group_id: 3,
4 timestamp: '2023-08-27T13:51:53.375+00:00',
5 max_watts: 250,
6 event_type: 0,
7 obs: {
8 watts: 168,
9 temp: 15
10 },
11 _ts: ISODate("2023-08-27T13:51:53.375Z"),
12 _stream_meta: {
13 sourceType: 'sampleData',
14 timestamp: ISODate("2023-08-27T13:51:53.375Z")
15 }
16}

Criar uma consulta de janela de rolagem

Uma janela em cascata é uma janela de tamanho fixo que avança no tempo em intervalos regulares. No Atlas Stream Processing, você usa o operador$tumblingWindow. Neste exemplo, vamos usar o operador para calcular a média de watts em intervalos de um minuto.
Consulte novamente o esquema a partir dos dados solar do fluxo de amostra. Para criar uma janela caída, vamos criar uma variável e definir nosso estágio de janela caída.
1var Twindow= {
2 $tumblingWindow: {
3 interval: { size: NumberInt(1), unit: "minute" },
4 pipeline: [
5 {
6 $group: {
7 _id: "$device_id",
8 max: { $max: "$obs.watts" },
9 avg: { $avg: "$obs.watts" }
10 }
11 }
12 ]
13 }
14}
Estamos calculando o valor máximo e a média ao longo de intervalos não sobrepostos de um minuto. Vamos usar o comando.process para executar a query de streaming em primeiro plano e ver nossos resultados no console.
1sp.process([solarstream,Twindow])
Aqui está um exemplo de saída da declaração:
1{
2 _id: 'device_4',
3 max: 236,
4 avg: 95,
5 _stream_meta: {
6 sourceType: 'sampleData',
7 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"),
8 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z")
9 }
10}
11{
12 _id: 'device_2',
13 max: 211,
14 avg: 117.25,
15 _stream_meta: {
16 sourceType: 'sampleData',
17 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"),
18 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z")
19 }
20}

Explorando o pipeline do operador de janela

O pipeline usado em uma função de janela pode incluir estágios de bloqueio e estágios sem bloqueio.
Operadores de acumuladores, como $avg, $count, $sorte $limit, podem ser usados em estágios de bloqueio. Dados significativos retornados desses operadores são obtidos quando executados em uma série de dados em comparação com um único ponto de dados. É por isso que eles são considerados bloqueadores.
Os estágios sem bloqueio não exigem que vários pontos de dados sejam significativos e incluem operadores como $addFields, $match, $project, $set, $unsete $unwind, para citar alguns. Você pode usar o não bloqueio antes, depois ou dentro dos estágios de bloqueio. Para ilustrar isso, vamos criar uma consulta que mostre a média, o máximo e o delta (a diferença entre o máximo e a média). Usaremos um $match sem bloqueio para mostrar apenas os resultados de device_1, calcularemos o tumblingWindow mostrando máximo e média e, em seguida, incluiremos outro$addFieldssem bloqueio .
1var m= { '$match': { device_id: 'device_1' } }
1var Twindow= {
2 '$tumblingWindow': {
3 interval: { size: Int32(1), unit: 'minute' },
4 pipeline: [
5 {
6 '$group': {
7 _id: '$device_id',
8 max: { '$max': '$obs.watts' },
9 avg: { '$avg': '$obs.watts' }
10 }
11 }
12 ]
13 }
14}
15
16var delta = { '$addFields': { delta: { '$subtract': ['$max', '$avg'] } } }
Agora podemos usar o arquivo .process comando para executar o processador de fluxo em primeiro plano e visualizar nossos resultados no console.
1sp.process([solarstream,m,Twindow,delta])
Os resultados desta consulta serão semelhantes aos seguintes:
1{
2 _id: 'device_1',
3 max: 238,
4 avg: 75.3,
5 _stream_meta: {
6 sourceType: 'sampleData',
7 windowStartTimestamp: ISODate("2023-08-27T19:11:00.000Z"),
8 windowEndTimestamp: ISODate("2023-08-27T19:12:00.000Z")
9 },
10 delta: 162.7
11}
12{
13 _id: 'device_1',
14 max: 220,
15 avg: 125.08333333333333,
16 _stream_meta: {
17 sourceType: 'sampleData',
18 windowStartTimestamp: ISODate("2023-08-27T19:12:00.000Z"),
19 windowEndTimestamp: ISODate("2023-08-27T19:13:00.000Z")
20 },
21 delta: 94.91666666666667
22}
23{
24 _id: 'device_1',
25 max: 238,
26 avg: 119.91666666666667,
27 _stream_meta: {
28 sourceType: 'sampleData',
29 windowStartTimestamp: ISODate("2023-08-27T19:13:00.000Z"),
30 windowEndTimestamp: ISODate("2023-08-27T19:14:00.000Z")
31 },
32 delta: 118.08333333333333
33}
Observe os segmentos de tempo e como eles se alinham a cada minuto.
Segmentos de tempo alinhados no minuto
Além disso, observe que a saída inclui a diferença entre os valores calculados de máximo e médio para cada janela.

Criar uma janela de salto

Uma janela de salto, às vezes chamada de janela deslizante, é uma janela de tamanho fixo que avança no tempo em intervalos sobrepostos. No Atlas Stream Processing, você usa o operador$hoppingWindow . Neste exemplo, vamos usar o operador para ver a média.
1var Hwindow = {
2 '$hoppingWindow': {
3 interval: { size: 1, unit: 'minute' },
4 hopSize: { size: 30, unit: 'second' },
5 pipeline: [
6 {
7 '$group': {
8 _id: '$device_id',
9 max: { '$max': '$obs.watts' },
10 avg: { '$avg': '$obs.watts' }
11 }
12 }
13 ]
14 }
15}
Para ajudar a ilustrar os segmentos de horário de início e término, vamos criar um filtro para retornar apenas device_1.
1var m = { '$match': { device_id: 'device_1' } }
Agora, vamos emitir o comando.processpara visualizar os resultados no console.
1sp.process([solarstream,m,Hwindow])
Um exemplo de resultado é o seguinte:
1{
2 _id: 'device_1',
3 max: 238,
4 avg: 76.625,
5 _stream_meta: {
6 sourceType: 'sampleData',
7 windowStartTimestamp: ISODate("2023-08-27T19:37:30.000Z"),
8 windowEndTimestamp: ISODate("2023-08-27T19:38:30.000Z")
9 }
10}
11{
12 _id: 'device_1',
13 max: 238,
14 avg: 82.71428571428571,
15 _stream_meta: {
16 sourceType: 'sampleData',
17 windowStartTimestamp: ISODate("2023-08-27T19:38:00.000Z"),
18 windowEndTimestamp: ISODate("2023-08-27T19:39:00.000Z")
19 }
20}
21{
22 _id: 'device_1',
23 max: 220,
24 avg: 105.54545454545455,
25 _stream_meta: {
26 sourceType: 'sampleData',
27 windowStartTimestamp: ISODate("2023-08-27T19:38:30.000Z"),
28 windowEndTimestamp: ISODate("2023-08-27T19:39:30.000Z")
29 }
30}
Observe os segmentos de tempo.
Segmentos de tempo sobrepostos
Os segmentos de tempo são sobrepostos em 30 segundos, conforme definido pela opção hopSize. As Windows de salto são úteis para capturar padrões de curto prazo nos dados.

Resumo

Ao processar dados continuamente dentro de janelas de tempo, você pode gerar insights e métricas em tempo real, que podem ser cruciais para aplicativos como monitoramento, detecção de fraudes e análise operacional. O Atlas Stream Processing fornece operadores de janela de queda e salto. Juntos, esses operadores permitem que você execute várias operações de agregação, como soma, média, mínimo e máximo, em uma janela específica de dados. Neste tutorial, você aprendeu a usar esses dois operadores com dados de amostra solar.

Saiba mais sobre o processamento de fluxo do MongoDB Atlas

Confira a publicação no blog doMongoDB Atlas Stream Processing . Para saber mais sobre operadores de janela no Atlas Stream Processing, saiba mais em nossa documentação.
Faça login hoje mesmo para começar. O Atlas Stream Processing está disponível para todos os desenvolvedores no Atlas. Experimente hoje mesmo!

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Usando os embeddings mais recentes da OpenAI em um sistema RAG com o MongoDB


Jul 01, 2024 | 15 min read
Início rápido

MongoDB com agente Bedrock: tutorial rápido


Jul 01, 2024 | 6 min read
Tutorial

Como utilizar funções do Azure com MongoDB Atlas em Java


Apr 14, 2023 | 8 min read
Tutorial

Como avaliar seu aplicativo LLM


Jun 24, 2024 | 20 min read
Sumário