$hoppingWindow
定义
$hoppingWindow
阶段指定用于聚合数据的 跳跃窗口。Atlas Stream Processing 窗口是有状态的,可以在中断后恢复,并具有处理延迟到达的数据的机制。您必须在该窗口阶段中将所有其他聚合查询应用于流数据。
$hoppingWindow
$hoppingWindow
管道阶段采用以下原型形式:{ "$hoppingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
语法
$hoppingWindow
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 文档 | 必需 | 以大小和时间单位的组合形式指定跳跃窗口间隔的文档,其中:
例如, |
| 文档 | 必需 | 将窗口开始时间之间的跳跃长度指定为时间
例如, |
| 阵列 | 必需 | 根据窗口内的消息对嵌套聚合管道进行评估。 |
| 文档 | Optional | 指定窗口边界相对于 UTC 的时间偏移的文档。 该文档是大小字段
例如, |
| 文档 | Optional | 该文档指定
如果设置 例如,考虑12 : 00下午 到1 : 00下午 窗口和 |
| 文档 | Optional | 文档,用于指定在处理窗口结束时间的文档后,将从源生成的窗口保持开放状态多长时间以接受延迟到达的数据。如果省略,则默认值为 3 秒。 |
行为
Atlas Stream Processing 仅支持每个管道一个窗口阶段。
当您将$group
阶段应用于窗口阶段时,单个组密钥的 RAM 限制为100 MB。
在 Windows 中,对某些聚合阶段的支持可能会受到限制或不可用。要了解更多信息,请参阅支持的聚合管道阶段。
如果服务中断,您可以从中断点的状态恢复窗口的内部管道。要了解更多信息,请参阅检查点。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source
阶段与Apache Kafka代理建立连接,在名为my_weatherdata
的主题中收集这些报告,并在将每条记录摄取到后续聚合阶段时将其公开。$hoppingWindow
阶段定义了重叠的时间窗口,持续时间为 100 秒,每 20 秒开始一次。每个窗口都会执行一个内部pipeline
,根据从 Apache Kafka 代理流式传输的sample_weatherdata
文档中的定义,找出给定窗口持续时间内的平均liquidPrecipitation.depth
。然后,pipeline
将输出一个文档,其_id
等同于其所表示的窗口的开始时间戳和该窗口的averagePrecipitation
。$merge
阶段将输出写入sample_weatherstream
数据库中名为stream
的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { // The resulting document's _id is the $hoppingWindow's start timestamp _id: "$_stream_meta.window.start", averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
要查看生成的 sample_weatherstream.stream
集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:20.000Z'), end: ISODate('2024-08-28T19:32:00.000Z') } }, averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:40.000Z'), end: ISODate('2024-08-28T19:32:20.000Z') } }, averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:00.000Z'), end: ISODate('2024-08-28T19:32:40.000Z') } }, averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:20.000Z'), end: ISODate('2024-08-28T19:33:00.000Z') } }, averagePrecipitation: 2378.374061433447 }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。