Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/ /

$hoppingWindow

Nesta página

  • Definição
  • Sintaxe
  • Comportamento
  • Exemplos

O $hoppingWindow estágio especifica uma janela de salto para agregação de dados. As janelas do Atlas Stream Processing têm estado, podem ser recuperadas se interrompidos e possuem mecanismos para processar dados que chegam atrasados. Você deve aplicar todas as outras consultas de agregação aos seus dados de streaming dentro desse estágio de janela.

$hoppingWindow

Um estágio de pipeline do $hoppingWindow tem a seguinte forma de protótipo:

{
"$hoppingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

O estágio $hoppingWindow recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
interval
documento
Obrigatório

Documento que especifica o intervalo de uma janela de salto como uma combinação de um tamanho e uma unidade de tempo em que:

  • O valor de size deve ser um número inteiro positivo diferente de zero.

  • O valor de unit pode ser um dos seguintes:

    • "ms" (milésimo de segundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por exemplo, um size de 20 e um unit de second define cada janela para permanecer aberta por 20 segundos.

hopSize
documento
Obrigatório

Documento que especifica o comprimento do salto entre os horários de início da janela como uma combinação de size e unit de tempo em que:

  • O valor de size deve ser um número inteiro positivo diferente de zero.

  • O valor de unit pode ser um dos seguintes:

    • "ms" (milésimo de segundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por exemplo, um size de 10 e um unit de second define um salto de 10 segundos entre os horários de início da janela.

pipeline
array
Obrigatório
Pipeline de agregação aninhado avaliado em relação às mensagens dentro da janela.
offset
documento
Opcional

Documento que especifica uma compensação de horário para os limites da janela em relação ao UTC. O documento é uma combinação do campo de tamanho offsetFromUtc e uma unidade de tempo em que:

  • O valor de offsetFromUtc deve ser um número inteiro positivo diferente de zero.

  • O valor de unit deve ser um dos seguintes:

    • "ms" (milésimo de segundo)

    • "second"

    • "minute"

    • "hour"

Por exemplo, um offsetFromUtc de 8 e um unit de hour gera limites que são deslocados oito horas à frente do UTC. Se você não especificar um deslocamento, os limites da janela se alinharão com UTC.

idleTimeout
documento
Opcional

Documento que especifica quanto tempo esperar antes de fechar Windows se $source estiver ocioso. Defina essa configuração como uma combinação de size e unit de tempo em que:

  • O valor de size deve ser um número inteiro positivo diferente de zero.

  • O valor de unit pode ser um dos seguintes:

    • "ms" (milésimo de segundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Se você definir idleTimeout, o Atlas Stream Processing fechará as Windows abertas somente se $source estiver ocioso por mais tempo do que o maior entre o tempo restante da janela ou o tempo idleTimeout. O temporizador ocioso começa assim que $source fica ocioso.

Por exemplo, considere uma janela 12:00 pm a 1:00 pm e idleTimeout tempo de 2 horas. Se o último evento for às 12:02 pm, após o qual $source fica inativo, o tempo restante da janela é 58 minutos. O Atlas Stream Processing fecha a janela após 2 horas de inatividade às 2:02 pm, que é maior que o tempo restante da janela e o tempo idleTimeout . Se o tempo idleTimeout estiver definido para apenas 10 minutos, o Atlas Stream Processing fechará a janela após 58 minutos de inatividade às 1:00 pm, que é mais longo que o tempo idleTimeout e a janela restante tempo.

allowedLateness
documento
Opcional
Documento que especifica por quanto tempo manter abertas as janelas geradas a partir da fonte para aceitar dados que chegam tarde após o processamento dos documentos até o horário de término da janela. Se omitido, o padrão é 3 segundos.

O Atlas Stream Processing permite apenas um estágio de janela por pipeline.

Quando você aplica o estágio $group ao estágio da janela, uma única chave de grupo tem um limite de 100 megabytes de RAM.

A compatibilidade para determinados estágios de agregação pode ser limitada ou indisponível nas janelas. Para saber mais, consulte Estágios de pipeline de agregação com compatibilidade.

No caso de uma interrupção de serviço, você pode retomar o pipeline interno de uma janela a partir do estado em que se encontrava no momento da interrupção. Para saber mais, consulte checkpoints.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o Apache Kafka que coleta esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores.

  2. O estágio $hoppingWindow define janelas de tempo sobrepostas com 100 segundos de duração e que começam a cada 20 segundos. Cada janela executa um pipeline interno que encontra a média liquidPrecipitation.depth, conforme definido nos sample_weatherdata documentos transmitidos pelo corretor Apache Kafka, durante a duração de uma determinada janela. Em seguida, o pipeline gera um único documento com um _id equivalente ao carimbo de data/hora de início da janela que ele representa e o averagePrecipitation dessa janela.

  3. O estágio $merge grava a saída na coleção do Atlas chamada stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

pipeline = [
{ $source:
{
"connectionName": "streamsExampleConnectionToKafka",
"topic": "my_weatherdata"
}
},
{ $hoppingWindow:
{
"interval": {
"size": 100,
"unit": "second"
},
"hopSize": {
"size": 20,
"unit": "second"
},
"pipeline" : [
{
$group: {
// The resulting document's _id is the $hoppingWindow's start timestamp
_id: "$_stream_meta.window.start",
averagePrecipitation: { $avg: "$liquidPrecipitation.depth" }
}
}
],
}
},
{ $merge:
{
"into":
{
"connectionName":"streamsExampleConnectionToAtlas",
"db":"streamDB",
"coll":"streamCollection"
}
}
}
]

Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-08-28T19:30:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:20.000Z'),
end: ISODate('2024-08-28T19:32:00.000Z')
}
},
averagePrecipitation: 2264.3973214285716
},
{
_id: ISODate('2024-08-28T19:30:40.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:40.000Z'),
end: ISODate('2024-08-28T19:32:20.000Z')
}
},
averagePrecipitation: 2285.7061611374406
},
{
_id: ISODate('2024-08-28T19:31:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:00.000Z'),
end: ISODate('2024-08-28T19:32:40.000Z')
}
},
averagePrecipitation: 2357.6940154440153
},
{
_id: ISODate('2024-08-28T19:31:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:20.000Z'),
end: ISODate('2024-08-28T19:33:00.000Z')
}
},
averagePrecipitation: 2378.374061433447
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$lookup