$hoppingWindow
Nesta página
Definição
O estágio $hoppingWindow
especifica umajanela de salto para agregação de dados. As janelas do Atlas Stream Processing são com estado, podem ser recuperadas se interrompidas e têm mecanismos para processar dados atrasados. Você deve aplicar todas as outras queries de agregação aos seus dados de streaming dentro desse estágio da janela.
$hoppingWindow
Um estágio de pipeline do
$hoppingWindow
tem a seguinte forma de protótipo:{ "$hoppingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
Sintaxe
O estágio $hoppingWindow
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
interval | documento | Obrigatório | Documento que especifica o intervalo de uma janela de salto como uma combinação de um tamanho e uma unidade de tempo em que:
Por exemplo, um |
hopSize | documento | Obrigatório | Documento que especifica o comprimento do salto entre os horários de início da janela como uma combinação de
Por exemplo, um |
pipeline | array | Obrigatório | Pipeline de agregação aninhado avaliado em relação às mensagens dentro da janela. |
offset | documento | Opcional | Documento que especifica uma compensação de horário para os limites da janela em relação ao UTC. O documento é uma combinação do campo de tamanho
Por exemplo, um |
idleTimeout | documento | Opcional | Documento que especifica quanto tempo esperar antes de fechar Windows se
Se você definir Por exemplo, considere uma janela 12:00 pm a 1:00 pm e |
allowedLateness | documento | Opcional | Documento que especifica por quanto tempo manter as janelas geradas a partir da origem abertas para aceitar dados que chegam atrasados após o processamento de documentos para o tempo final da janela. Se omitido, o padrão é 3 segundos. |
Comportamento
O Atlas Stream Processing permite apenas um estágio de janela por pipeline.
Quando você aplica o estágio $group
ao estágio da janela, uma única chave de grupo tem um limite de 100 megabytes de RAM.
O suporte para determinados estágios de agregação pode ser limitado ou indisponível no Windows. Para saber mais, consulte Estágios de aggregation pipeline suportados.
No caso de uma interrupção de serviço, você pode retomar o pipeline interno de uma janela de seu estado no ponto de interrupção. Para saber mais, consulte Checkpoints.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados a partir de vários locais, em conformidade com o esquema do Conjunto de dados meteorológicos de amostra. A seguinte aggregation tem três estágios:
O
$source
estágio estabelece uma conexão com o Apache Kafka o corretor coleta esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é ingerido para os estágios de agregação subsequentes.O estágio define janelas sobrepostas de tempo que
$hoppingWindow
têm 100 segundos de duração e que começam a cada 20 segundos. Cada janela executa um internopipeline
que encontra a médialiquidPrecipitation.depth
, conforme definido nossample_weatherdata
documentos transmitidos do Apache Kafka corretor, durante a duração de uma determinada janela. Empipeline
seguida, o gera um único documento com um_id
equivalente ao carimbo de data/hora de início da janela que representa e oaveragePrecipitation
para essa janela.O estágio
$merge
escreve o resultado em uma collection do Atlas chamadastream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados ou coleção, o Atlas os criará.
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { // The resulting document's _id is the $hoppingWindow's start timestamp _id: "$_stream_meta.window.start", averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
Para visualizar os documentos na coleção sample_weatherstream.stream
resultante, conecte-se ao cluster do Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:20.000Z'), end: ISODate('2024-08-28T19:32:00.000Z') } }, averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:40.000Z'), end: ISODate('2024-08-28T19:32:20.000Z') } }, averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:00.000Z'), end: ISODate('2024-08-28T19:32:40.000Z') } }, averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:20.000Z'), end: ISODate('2024-08-28T19:33:00.000Z') } }, averagePrecipitation: 2378.374061433447 }
Observação
O anterior é um exemplo representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.