Explorando operadores de janela no processamento de fluxo Atlas
Robert Walters4 min read • Published Aug 13, 2024 • Updated Aug 13, 2024
Avalie esse Tutorial
Em nossa postagem anterior sobre janelamento, apresentamos os operadores de janela disponíveis no Atlas Stream Processing. Os operadores de janela são uma das operações mais comumente usadas para processar dados de streaming de forma eficaz. O Atlas Stream Processing fornece dois operadores de janela: $tblingWindow e $hoppingWindow. Neste tutorial, exploraremos ambos os operadores usando o gerador de dados solar de amostra fornecido no Atlas Stream Processing.
Antes de começarmos a criar processadores de fluxo, verifique se você tem um usuário do bancode dados que tenha acesso "atlasAdmin " ao Atlas. Além disso, se você ainda não tiver uma Instância de Atlas Stream Processing criada com uma conexão com o gerador de dados sample_stream_solar, siga as instruções em Introdução ao Atlas Stream Processing: Criando Seu Primeiro Processador de Stream e continue.
Primeiro, confirme se sample_stream_solar foi adicionado como uma conexão, emitindo
sp.listConnections()
.1 AtlasStreamProcessing> sp.listConnections() 2 { 3 ok: 1, 4 connections: [ 5 { 6 name: 'sample_stream_solar', 7 type: 'inmemory', 8 createdAt: ISODate("2023-08-26T18:42:48.357Z") 9 } 10 ] 11 }
Em seguida, vamos definir um estágio$source para descrever de onde o Atlas Stream Processing lerá os dados de stream.
1 var solarstream={ $source: { "connectionName": "sample_stream_solar" } }
Em seguida, emita um comando.process para visualizar o conteúdo do stream no console.
1 sp.process([solarstream])
Você verá o fluxo de dados solar impresso no console. Uma amostra desses dados é a seguinte:
1 { 2 device_id: 'device_2', 3 group_id: 3, 4 timestamp: '2023-08-27T13:51:53.375+00:00', 5 max_watts: 250, 6 event_type: 0, 7 obs: { 8 watts: 168, 9 temp: 15 10 }, 11 _ts: ISODate("2023-08-27T13:51:53.375Z"), 12 _stream_meta: { 13 sourceType: 'sampleData', 14 timestamp: ISODate("2023-08-27T13:51:53.375Z") 15 } 16 }
Uma janela em cascata é uma janela de tamanho fixo que avança no tempo em intervalos regulares. No Atlas Stream Processing, você usa o operador$tumblingWindow. Neste exemplo, vamos usar o operador para calcular a média de watts em intervalos de um minuto.
Consulte novamente o esquema a partir dos dados solar do fluxo de amostra. Para criar uma janela caída, vamos criar uma variável e definir nosso estágio de janela caída.
1 var Twindow= { 2 $tumblingWindow: { 3 interval: { size: NumberInt(1), unit: "minute" }, 4 pipeline: [ 5 { 6 $group: { 7 _id: "$device_id", 8 max: { $max: "$obs.watts" }, 9 avg: { $avg: "$obs.watts" } 10 } 11 } 12 ] 13 } 14 }
Estamos calculando o valor máximo e a média ao longo de intervalos não sobrepostos de um minuto. Vamos usar o comando
.process
para executar a query de streaming em primeiro plano e ver nossos resultados no console.1 sp.process([solarstream,Twindow])
Aqui está um exemplo de saída da declaração:
1 { 2 _id: 'device_4', 3 max: 236, 4 avg: 95, 5 _stream_meta: { 6 sourceType: 'sampleData', 7 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"), 8 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z") 9 } 10 } 11 { 12 _id: 'device_2', 13 max: 211, 14 avg: 117.25, 15 _stream_meta: { 16 sourceType: 'sampleData', 17 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"), 18 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z") 19 } 20 }
O pipeline usado em uma função de janela pode incluir estágios de bloqueio e estágios sem bloqueio.
Operadores de acumuladores, como
$avg
, $count
, $sort
e $limit
, podem ser usados em estágios de bloqueio. Dados significativos retornados desses operadores são obtidos quando executados em uma série de dados em comparação com um único ponto de dados. É por isso que eles são considerados bloqueadores.Os estágios sem bloqueio não exigem que vários pontos de dados sejam significativos e incluem operadores como
$addFields
, $match
, $project
, $set
, $unset
e $unwind
, para citar alguns. Você pode usar o não bloqueio antes, depois ou dentro dos estágios de bloqueio. Para ilustrar isso, vamos criar uma consulta que mostre a média, o máximo e o delta (a diferença entre o máximo e a média). Usaremos um $match sem bloqueio para mostrar apenas os resultados de device_1, calcularemos o tumblingWindow mostrando máximo e média e, em seguida, incluiremos outro$addFields
sem bloqueio .1 var m= { '$match': { device_id: 'device_1' } }
1 var Twindow= { 2 '$tumblingWindow': { 3 interval: { size: Int32(1), unit: 'minute' }, 4 pipeline: [ 5 { 6 '$group': { 7 _id: '$device_id', 8 max: { '$max': '$obs.watts' }, 9 avg: { '$avg': '$obs.watts' } 10 } 11 } 12 ] 13 } 14 } 15 16 var delta = { '$addFields': { delta: { '$subtract': ['$max', '$avg'] } } }
Agora podemos usar o arquivo .process comando para executar o processador de fluxo em primeiro plano e visualizar nossos resultados no console.
1 sp.process([solarstream,m,Twindow,delta])
Os resultados desta consulta serão semelhantes aos seguintes:
1 { 2 _id: 'device_1', 3 max: 238, 4 avg: 75.3, 5 _stream_meta: { 6 sourceType: 'sampleData', 7 windowStartTimestamp: ISODate("2023-08-27T19:11:00.000Z"), 8 windowEndTimestamp: ISODate("2023-08-27T19:12:00.000Z") 9 }, 10 delta: 162.7 11 } 12 { 13 _id: 'device_1', 14 max: 220, 15 avg: 125.08333333333333, 16 _stream_meta: { 17 sourceType: 'sampleData', 18 windowStartTimestamp: ISODate("2023-08-27T19:12:00.000Z"), 19 windowEndTimestamp: ISODate("2023-08-27T19:13:00.000Z") 20 }, 21 delta: 94.91666666666667 22 } 23 { 24 _id: 'device_1', 25 max: 238, 26 avg: 119.91666666666667, 27 _stream_meta: { 28 sourceType: 'sampleData', 29 windowStartTimestamp: ISODate("2023-08-27T19:13:00.000Z"), 30 windowEndTimestamp: ISODate("2023-08-27T19:14:00.000Z") 31 }, 32 delta: 118.08333333333333 33 }
Observe os segmentos de tempo e como eles se alinham a cada minuto.
Além disso, observe que a saída inclui a diferença entre os valores calculados de máximo e médio para cada janela.
Uma janela de salto, às vezes chamada de janela deslizante, é uma janela de tamanho fixo que avança no tempo em intervalos sobrepostos. No Atlas Stream Processing, você usa o operador
$hoppingWindow
. Neste exemplo, vamos usar o operador para ver a média.1 var Hwindow = { 2 '$hoppingWindow': { 3 interval: { size: 1, unit: 'minute' }, 4 hopSize: { size: 30, unit: 'second' }, 5 pipeline: [ 6 { 7 '$group': { 8 _id: '$device_id', 9 max: { '$max': '$obs.watts' }, 10 avg: { '$avg': '$obs.watts' } 11 } 12 } 13 ] 14 } 15 }
Para ajudar a ilustrar os segmentos de horário de início e término, vamos criar um filtro para retornar apenas device_1.
1 var m = { '$match': { device_id: 'device_1' } }
Agora, vamos emitir o comando
.process
para visualizar os resultados no console.1 sp.process([solarstream,m,Hwindow])
Um exemplo de resultado é o seguinte:
1 { 2 _id: 'device_1', 3 max: 238, 4 avg: 76.625, 5 _stream_meta: { 6 sourceType: 'sampleData', 7 windowStartTimestamp: ISODate("2023-08-27T19:37:30.000Z"), 8 windowEndTimestamp: ISODate("2023-08-27T19:38:30.000Z") 9 } 10 } 11 { 12 _id: 'device_1', 13 max: 238, 14 avg: 82.71428571428571, 15 _stream_meta: { 16 sourceType: 'sampleData', 17 windowStartTimestamp: ISODate("2023-08-27T19:38:00.000Z"), 18 windowEndTimestamp: ISODate("2023-08-27T19:39:00.000Z") 19 } 20 } 21 { 22 _id: 'device_1', 23 max: 220, 24 avg: 105.54545454545455, 25 _stream_meta: { 26 sourceType: 'sampleData', 27 windowStartTimestamp: ISODate("2023-08-27T19:38:30.000Z"), 28 windowEndTimestamp: ISODate("2023-08-27T19:39:30.000Z") 29 } 30 }
Observe os segmentos de tempo.
Os segmentos de tempo são sobrepostos em 30 segundos, conforme definido pela opção hopSize. As Windows de salto são úteis para capturar padrões de curto prazo nos dados.
Ao processar dados continuamente dentro de janelas de tempo, você pode gerar insights e métricas em tempo real, que podem ser cruciais para aplicativos como monitoramento, detecção de fraudes e análise operacional. O Atlas Stream Processing fornece operadores de janela de queda e salto. Juntos, esses operadores permitem que você execute várias operações de agregação, como soma, média, mínimo e máximo, em uma janela específica de dados. Neste tutorial, você aprendeu a usar esses dois operadores com dados de amostra solar.
Confira a publicação no blog doMongoDB Atlas Stream Processing . Para saber mais sobre operadores de janela no Atlas Stream Processing, saiba mais em nossa documentação.
Faça login hoje mesmo para começar. O Atlas Stream Processing está disponível para todos os desenvolvedores no Atlas. Experimente hoje mesmo!