Windows do processador de fluxo
Nesta página
As janelas do Atlas Stream Processing são fases de pipeline de agregação que capturam subconjuntos de um fluxo de dados com limite de tempo, permitindo que você execute operações que exigem entradas finitas em dados de streaming.
Considere o exemplo de processador de fluxo descrito aqui. O estágio $match
pode operar diretamente no fluxo de dados extraídos por $source
, verificando cada documento em relação aos critérios de correspondência à medida que o processador de fluxo o ingere.
Por outro lado, o estágio $group
e os vários cálculos estatísticos contidos nele não podem operar em dados ilimitados, pois é impossível determinar valores mínimos, máximos, médios ou medianos sem primeiro limitar o conjunto de valores a serem considerados. Muitos operadores não matemáticos, como $push e $top, também exigem dados limitados.
Um processador de stream fornece uma janela a esses limites. Uma janela se abre, e todos os documentos que o processador de fluxo ingere se acumulam no estado dessa janela até que um intervalo de tempo predefinido termine e a janela se feche. A janela agrupa todos os documentos que captura durante esse intervalo e passa esse conjunto por seu pipeline interno. De dentro desse pipeline, os documentos em lote são indistinguíveis dos dados em repouso.
Atlas Stream Processing oferece suporte para Tbling Windows e Hopping Windows.
Windows girando
As Windows em queda são Windows definidas inteiramente pelos intervalos de tempo que capturam. Esses intervalos de tempo não se sobrepõem.
Exemplo
Você define uma janela em cascata com um intervalo de 3 segundos. Quando você inicia o processador de stream:
Uma janela abre por 3 segundos.
A primeira janela captura todos os documentos que o fluxo gera dentro desses 3 segundos.
Após 3 segundos decorridos, a janela fecha e aplica sua lógica de agregação a todos os documentos nessa janela.
Se você configurar
allowedLateness
, o Atlas Stream Processing gravará mensagens que chegam tarde na dead letter queue (DLQ) após o fechamento da janela.Uma nova janela abre assim que a primeira é fechada e captura documentos do fluxo pelos próximos 3 segundos.
As Windows em cascata garantem a captura abrangente de fluxos de dados sem o processamento repetido de documento individuais.
Windows no salto
Windows de salto são Windows definidas pelo intervalo de tempo que capturam e pelo intervalo entre a abertura de cada janela, chamado de salto. Como a duração é dissociada da frequência, você pode configurar o salto do Windows para se sobrepor ou se separar.
Para definir uma janela de salto com sobreposição, defina um salto menor que o intervalo.
Exemplo
Você define uma janela de salto com um intervalo de 20 segundos e um salto de 5 segundos. Quando você inicia o processador de stream:
Uma janela abre por 20 segundos.
A primeira janela captura todos os documentos que o fluxo gera dentro desses 20 segundos.
5 segundos depois, outra janela abre e captura todos os documentos nos próximos 20 segundos. Como a primeira Windows ainda está aberta, todos os documento gerados pelo fluxo nos próximos 15 segundos são capturados por ambas as Windows.
20 segundos após a abertura da primeira janela, ela fecha e aplica sua lógica de agregação a todos os documentos nessa janela.
5 segundos depois, a segunda janela fecha e aplica sua lógica de agregação a todos os documentos nessa janela, incluindo aqueles que já estavam sujeitos à lógica de agregação na primeira janela.
Se você configurar allowedLateness
, o Atlas Stream Processing gravará mensagens que chegam tarde na dead letter queue (DLQ) após o fechamento da janela.
Para definir uma janela de salto com espaçamento, defina um salto maior que o intervalo.
Exemplo
Você define uma janela de salto com um intervalo de 3 segundos e um salto de 5 segundos. Quando você inicia um processador de stream:
Uma janela abre por 3 segundos.
A primeira janela captura todos os documentos nos próximos 3 segundos.
Após 3 segundos decorridos, a janela fecha e aplica sua lógica de agregação a todos os documentos nessa janela.
A próxima janela é aberta após mais 2 segundos.
O Atlas Stream Processing não processa nenhum documento gerado pelo stream durante esses 2 segundos.
Tempo Atlas Stream Processing
No processamento de dados em streaming, os documentos estão sujeitos a dois sistemas de temporização:
Hora do evento - A hora em que o fluxo de origem gera um documento ou o sistema de mensagens (por exemplo, Apache Kafka) recebe o documento. Isso é verificado pelo registro de data e hora do documento.
Tempo de processamento - O tempo em que o processador de fluxo consome um documento. Isso é verificado pelo relógio do sistema que hospeda o processador de fluxo.
A latência da rede, o processamento inicial e outros fatores podem não apenas causar discrepâncias entre esses horários em um determinado documento, mas também podem fazer com que os documentos cheguem a um processador de fluxo fora da ordem do evento. Em qualquer caso, o Windows pode perder documentos que você pretende que eles registrem. O Atlas Stream Processing considera esses documentos como atrasados e os envia para sua fila de mensagens não entregues, se você configurar uma.
O Atlas Stream Processing oferece vários mecanismos que alteram o comportamento da janela para atenuar esses problemas.
Marcas d'água
Uma marca dágua substitui o tempo de processamento e atualiza somente quando o processador consome um documento com um tempo de evento posterior ao de qualquer documento consumido anteriormente. Todos os processadores de stream aplicam marcas d'agua no Atlas Stream Processing.
Exemplo
Você configura um processador de stream com o Windows de 5minutos. Você inicia o processador em 12:00
, de modo que as duas primeiras Windows tenham durações de 12:00-12:05
e 12:05-12:10
. A tabela a seguir ilustra qual Windows capturará quais eventos dados atrasos variáveis, com e sem marcas d''d'gua.
Hora do evento | Tempo em processamento | Tempo de janela (sem marcas d'gua) | Tempo de janela (marcas d'água) |
---|---|---|---|
12:00 | 12:00 | 12:00-12:05 | 12:00-12:05 |
12:00 | 12:01 | 12:00-12:05 | 12:00-12:05 |
12:01 | 12:03 | 12:00-12:05 | 12:00-12:05 |
12:03 | 12:04 | 12:00-12:05 | 12:00-12:05 |
12:02 | 12:05 | 12:05-12:10 | 12:00-12:05 |
12:01 | 12:06 | 12:05-12:10 | 12:00-12:05 |
12:04 | 12:06 | 12:05-12:10 | 12:00-12:05 |
12:05 | 12:07 | 12:05-12:10 | 12:05-12:10 |
12:06 | 12:07 | 12:05-12:10 | 12:04-12:10 |
12:06 | 12:08 | 12:05-12:10 | 12:05-12:10 |
No cenário em que as marcas d'gua não se aplicam, a janela 12:00-12:05
fecha às 12:05
, de acordo com o relógio do sistema da instância do Atlas Stream Processing , e a janela 12:05-12:10
abre imediatamente. Como resultado, embora a origem tenha gerado sete dos documentos durante o intervalo 12:00-12:05
, a janela relevante captura apenas quatro documentos.
No cenário em que as marcas d'água se aplicam, a janela 12:00-12:05
não fecha em 12:05
porque, entre os documentos que ela ingere até esse ponto, a hora do evento mais recente — e, portanto, o valor da marca d'água — é 12:03
. A janela 12:00-12:05
não é fechada até 12:07
no relógio do sistema, quando o processador de fluxo ingere um documento com uma hora de evento de 12:05
, avança a marca d'água para essa hora e abre a janela 12:05-12:10
. Cada janela captura todos os documentos apropriados.
Atrasos permitidos
Se as diferenças entre o tempo do evento e o tempo de processamento variarem o suficiente, os documentos poderão chegar a um processador de fluxo depois que a marca d''d'''d''gua tiver avançado o suficiente para fechar a janela esperada. Para mitigar isso, o Atlas Stream Processing suporta o Lateness permitido, uma configuração que atrasa o fechamento de uma janela por um intervalo definido em relação à marca d'gua.
Enquanto as marcas d'água são propriedades dos processadores de fluxo, o atraso permitido é uma propriedade de uma janela e afeta somente quando essa janela é fechada. Se a marca d'água do processador de fluxo avançar a um ponto que acione uma nova janela, o atraso permitido mantém as janelas anteriores abertas sem evitar isso.
Exemplo
Você configura um processador de fluxo com 5minutos girando Windows. Você inicia o processador em 12:00
, de modo que as duas primeiras Windows tenham durações de 12:00-12:05
e 12:05-12:10
. Você definiu um atraso permitido de 2 minutos.
A tabela abaixo reflete a ordem na qual o processador de fluxo ingere os documentos descritos.
Hora do evento | Marca d'água | Tempo de atraso permitido | Janela de tempo |
---|---|---|---|
12:00 | 12:00 | 11:58 | 12:00-12:05 |
12:01 | 12:01 | 11:59 | 12:00-12:05 |
12:03 | 12:03 | 12:01 | 12:00-12:05 |
12:02 | 12:03 | 12:01 | 12:00-12:05 |
12:04 | 12:04 | 12:02 | 12:00-12:05 |
12:01 | 12:04 | 12:02 | 12:00-12:05 |
12:05 | 12:05 | 12:03 | 12:00-12:15, 12:05-12:10 |
12:06 | 12:06 | 12:04 | 12:00-12:05, 12:05-12:10 |
12:04 | 12:06 | 12:04 | 12:00-12:05, 12:05-12:10 |
12:07 | 12:07 | 12:05 | 12:05-12:10 |
Quando a marca d'água avança para 12:05
, a janela 12:05-12:10
se abre. No entanto, como o intervalo de atraso permitido é de 2 minutos, de dentro da janela 12:00-12:05
, ele é efetivamente apenas 12:03
, portanto, permanece aberto. Somente quando a marca d'água avança para 12:07
é que o tempo ajustado chega a 12:05
. Nesse ponto, a janela 12:00-12:05
é fechada.
Tempo limite de inatividade
O desacoplamento do comportamento de janela do tempo de processamento por padrão melhora a correção do processamento de fluxo na maioria dos casos. No entanto, uma fonte de dados de streaming pode ter períodos prolongados de inatividade. Nesse cenário, uma janela pode capturar eventos antes do período de ociosidade e não conseguir retornar os resultados processados enquanto espera que a marca d'água avance o suficiente para fechar.
O Atlas Stream Processing permite que os usuários configurem um tempo limite de inatividade para o Windows para mitigar esses cenários usando o tempo de processamento. Um tempo limite de ociosidade é um intervalo de tempo que começa quando o tempo de processamento passa do final do intervalo de uma janela aberta e a fonte do processador de fluxo está ociosa. Se a fonte permanecer ociosa por um intervalo igual ao tempo limite de ociosidade, a janela será fechada e a marca d''d'''''ausência independente de qualquer ingestão de documento.
Exemplo
Você configura uma janela em cascata com um intervalo de 3 minutos e um tempo limite de ociosidade de 1 minuto. A tabela a seguir ilustra os efeitos do tempo limite de ociosidade durante e após o intervalo de uma janela.
Tempo em processamento | Hora ou status do evento | Marca d'água | Janela de tempo |
---|---|---|---|
12:00 | 12:00 | 12:00 | 12:00-12:03 |
12:01 | Fonte ociosa | 12:00 | 12:00-12:03 |
12:02 | Fonte ociosa | 12:00 | 12:00-12:03 |
12:03 | Fonte ociosa | 12:00 | 12:00-12:03 |
12:04 | 12:02 | 12:02 | 12:00-12:03 |
12:05 | 12:05 | 12:05 | 12:03-12:06 |
12:06 | Fonte ociosa | 12:05 | 12:03-12:06 |
12:07 | Fonte ociosa | 12:00 | 12:06-12:09 |
12:08 | Fonte ociosa | 12:00 | 12:06-12:09 |
12:09 | 12:09 | 12:09 | 12:09-12:12 |
Durante o intervalo 12:00-12:03
, a fonte fica inativa por três minutos, mas o processador de stream não fecha a janela porque o tempo de processamento não passou do final do intervalo da janela e a fonte não permanece inativa após o término do intervalo da janela. Quando a marca d'água avança para 12:05
, a janela fecha normalmente e a janela 12:03-12:06
se abre.
Quando a fonte fica inativa em 12:06
, ela permanece inativa em 12:07
, acionando o tempo limite de inatividade e avançando a marca d' d'gua para 12:06
.