流处理器 Windows
Atlas Stream Processing 窗口是 聚合管道阶段,可捕获数据流的时间限制子集,让您可以对流数据执行需要有限输入的操作。
以此处描述的 示例流处理器为例。$match
阶段可直接对 $source
输入的数据流进行处理,在数据流处理器输入每份文档时,根据匹配标准对其进行检查。
相比之下,$group
阶段及其中包含的各种统计计算无法对无界数据进行操作,因为如果不首先对要考虑的值集进行边界限定,就无法确定最小值、最大值、平均值或中位数值。许多非数学操作符(如 $push 和 $top )也需要有界数据。
流处理器通过窗口为这些边界提供边界。此时将打开一个窗口,流处理器摄取的所有文档都会累积在这个窗口状态中,直到达到一个预先设定的时间间隔,窗口关闭为止。该窗口会对在该时间段内捕获的所有文档进行批处理,并通过内部管道传递文档集。在此管道中,批处理文档与静态数据无法区分。
Atlas Stream Processing 提供对Tumbling Windows和Hopping Windows 的支持。
翻滚 Windows
翻滚Windows是完全由其捕获的时间间隔定义的Windows 。 这些时间间隔不重叠。
例子
您可以定义间隔为 3 秒的滚动窗口。 当您启动流处理器时:
窗口会打开 3 秒钟。
第一个窗口捕获流在这 3 秒内生成的所有文档。
3 秒后,窗口关闭并将聚合逻辑应用于该窗口中的所有文档。
如果您配置
allowedLateness
,则 Atlas Stream Processing 会在窗口关闭后将迟到的消息写入死信队列。第一个窗口关闭后会立即打开一个新窗口,并在接下来的 3 秒内捕获数据流中的文档。
滚动窗口可确保全面捕获数据流,而无需重复处理单个文档。
跳跃Windows
跳跃Windows Windows由捕获的时间间隔和打开每个窗口之间的间隔(称为跳跃)定义的窗口。 由于持续时间与频率分离,因此您可以将跳跃Windows配置为重叠或彼此间隔开。
要定义重叠的跳跃窗口,请设置小于间隔的跳跃。
例子
您可以定义间隔为 20 秒、跳跃为 5 秒的跳跃窗口。 当您启动流处理器时:
窗口会打开 20 秒。
第一个窗口捕获流在这 20 秒内生成的所有文档。
5 秒后,另一个窗口将打开并捕获接下来 20 秒内的所有文档。 由于第一个窗口仍处于打开状态,因此流在接下来的 15 秒内生成的所有文档都会被两个窗口捕获。
第一个窗口在打开 20 秒后关闭,并将聚合逻辑应用于该窗口中的所有文档。
5 秒后,第二个窗口将关闭,并将聚合逻辑应用于该窗口中的所有文档,包括在第一个窗口中已受聚合逻辑约束的文档。
如果您配置allowedLateness
,则 Atlas Stream Processing 会在窗口关闭后将迟到的消息写入死信队列。
要定义带间距的跳跃窗口,请设置大于间隔的跳跃。
例子
您可以定义一个间隔为 3 秒、跳跃为 5 秒的跳跃窗口。 启动流处理器时:
窗口会打开 3 秒钟。
第一个窗口捕获接下来 3 秒的所有文档。
3 秒后,窗口关闭并将聚合逻辑应用于该窗口中的所有文档。
再过 2 秒后,下一个窗口将打开。
Atlas Stream Processing 不会处理该流在这两秒内生成的任何文档。
Atlas Stream Processing时序
在流式数据处理中,文档受两个计时系统的约束:
事件时间 - 源流数据生成文档或消息系统的时间(例如Apache Kafka)接收文档。这可以通过文档的时间戳来确定。
处理时间 — 流处理器使用文档的时间。 这是通过托管流处理器的系统的时钟来确定的。
网络延迟、上游处理和其他因素不仅会导致给定文档的这些时间出现差异,还会导致文档到达流处理器的时间顺序与事件时间顺序不一致。无论哪种情况,windows 都可能错过您打算让其捕获的文档。Atlas Stream Processing 会认为此类文档延迟到达,并将它们发送到死信队列(如果您配置了死信队列)。
Atlas Stream Processing 提供了各种更改窗口行为的机制,以缓解这些问题。
水印
水印会取代处理时间,并且仅当处理器消耗的文档的事件时间晚于任何先前消耗的文档时才会进行更新。 所有流处理器都会在 Atlas Stream Processing 中应用水印。
例子
您可以使用 5 分钟的Windows配置流处理器。 您在 12:00
启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05
和 12:05-12:10
。 下表说明了在不同延迟下(带水印和不带水印),哪些Windows将捕获哪些事件。
事件时间 | 处理时间 | 窗口时间(无水印) | 窗口时间(水印) |
---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 00 | 12 : 01 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 03 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 03 | 12 : 04 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 05 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 06 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 04 | 12 : 06 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 05 | 12 : 07 | 12 : 05 - 12 : 10 | 12 : 05 - 12 : 10 |
12 : 06 | 12 : 07 | 12 : 05 - 12 : 10 | 12 : 04 - 12 : 10 |
12 : 06 | 12 : 08 | 12 : 05 - 12 : 10 | 12 : 05 - 12 : 10 |
在不应用水印的情况下,12:00-12:05
窗口会根据Atlas Stream Processing实例的系统时钟在 12:05
关闭,并立即打开 12:05-12:10
窗口。 因此,尽管源在12:00-12:05
间隔内生成了其中的七个文档,但相关窗口仅捕获了四个文档。
在应用水印的情况下,12:00-12:05
窗口不会在 12:05
处关闭,因为在它接收的文档中,最新的事件时间(即水印值)是 12:03
。直到系统时钟上的 12:07
,流处理器接收到事件时间为 12:05
的文档,将水印提前到该时间,并打开 12:05-12:10
窗口时, 12:00-12:05
窗口才会关闭。每个窗口都会捕获所有相应的文档。
允许迟到
如果事件时间和处理时间之间的差异足够大,则在水印已提前到足以关闭预期窗口后,文档可能会到达流处理器。 为了缓解这个问题,Atlas Stream Processing 支持“允许延迟”,该设置可将窗口关闭延迟相对于水印的设定时间间隔。
水印是流处理器的属性,而“允许延迟”是窗口的属性,并且仅在该窗口关闭时才会产生影响。如果流处理器的水印前进到会触发打开新窗口的点,则“允许延迟”会使较早的窗口保持打开状态,而不会阻止此操作。
例子
您可以使用 5 分钟的滚动Windows来配置流处理器。 您在 12:00
启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05
和 12:05-12:10
。 您将允许迟到时间设置为2分钟。
下表反映了流处理器摄取所述文件的顺序。
事件时间 | 水印 | 允许迟到时间 | 窗口时间 |
---|---|---|---|
12 : 00 | 12 : 00 | 11 : 58 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 01 | 11 : 59 | 12 : 00 - 12 : 05 |
12 : 03 | 12 : 03 | 12 : 01 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 03 | 12 : 01 | 12 : 00 - 12 : 05 |
12 : 04 | 12 : 04 | 12 : 02 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 04 | 12 : 02 | 12 : 00 - 12 : 05 |
12 : 05 | 12 : 05 | 12 : 03 | 12:00-12:15, 12:05-12:10 |
12 : 06 | 12 : 06 | 12 : 04 | 12:00-12:05, 12:05-12:10 |
12 : 04 | 12 : 06 | 12 : 04 | 12:00-12:05, 12:05-12:10 |
12 : 07 | 12 : 07 | 12 : 05 | 12 : 05 - 12 : 10 |
当水印前进到 12:05
时,12:05-12:10
窗口将打开。但是,由于“允许延迟”间隔为 2 分钟,在 12:00-12:05
窗口中,它实际上仅为 12:03
,因此它保持打开状态。仅当水印前进到 12:07
时,调整后的时间才会达到 12:05
。此时,12:00-12:05
窗口将关闭。
空闲超时
默认情况下将窗口行为与处理时间分离可以在大多数情况下提高流处理的正确性。但是,流数据源可能会有长时间的空闲状态。在这种情况下,窗口可能会在空闲期之前捕获事件,并且在等待水印前进到足以关闭时无法返回处理后的结果。
Atlas Stream Processing允许用户为Windows配置空闲超时,以缓解这些使用处理时间的情况。 空闲超时是指当处理时间超过打开窗口时间间隔的终点并且流处理器的源处于空闲状态时开始的时间间隔。 如果源保持空闲的时间间隔等于空闲超时时间,则窗口将关闭,并且水印独立于任何文档摄取而前进。
例子
您可以配置具有 3 分钟间隔和 1 分钟空闲超时的滚动窗口。下表说明了窗口间隔期间和之后空闲超时的影响。
处理时间 | 事件时间或状态 | 水印 | 窗口时间 |
---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 01 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 02 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 03 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 04 | 12 : 02 | 12 : 02 | 12 : 00 - 12 : 03 |
12 : 05 | 12 : 05 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 06 | 源空闲 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 07 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 08 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 09 | 12 : 09 | 12 : 09 | 12 : 09 - 12 : 12 |
在 12:00-12:03
间隔期间,源空闲三分钟,但流处理器不会关闭窗口,因为处理时间未超过窗口间隔的结束时间,并且源在窗口间隔结束后不会保持空闲状态。当水印前进到 12:05
时,窗口将正常关闭, 12:03-12:06
窗口将打开。
当源在12:06
进入空闲状态时,它会在12:07
之前保持空闲状态,从而触发空闲超时并将水印提前到12:06
。