Docs 菜单
Docs 主页
/
MongoDB Atlas
/ /

$hoppingWindow

在此页面上

  • 定义
  • 语法
  • 行为
  • 示例

$hoppingWindow 阶段指定用于聚合数据的 跳跃窗口。Atlas Stream Processing 窗口是有状态的,可以在中断后恢复,并具有处理延迟到达的数据的机制。您必须在该窗口阶段中将所有其他聚合查询应用于流数据。

$hoppingWindow

$hoppingWindow 管道阶段采用以下原型形式:

{
"$hoppingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

$hoppingWindow 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

interval

文档

必需

以大小和时间单位的组合形式指定跳跃窗口间隔的文档,其中:

  • size的值必须是非零正整数。

  • unit的值可以是以下值之一:

    • "ms" (毫秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

例如, 20size和 {3 unit second将每个窗口设置为保持打开状态20秒。

hopSize

文档

必需

将窗口开始时间之间的跳跃长度指定为时间sizeunit的组合的文档,其中:

  • size的值必须是非零正整数。

  • unit的值可以是以下值之一:

    • "ms" (毫秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

例如,10sizesecondunit 定义了窗口开始时间之间 10 秒的跳跃。

pipeline

阵列

必需

根据窗口内的消息对嵌套聚合管道进行评估。

offset

文档

Optional

指定窗口边界相对于 UTC 的时间偏移的文档。 该文档是大小字段offsetFromUtc和时间单位的组合,其中:

  • offsetFromUtc的值必须是非零正整数。

  • unit的值必须是以下值之一:

    • "ms" (毫秒)

    • "second"

    • "minute"

    • "hour"

例如, 8offsetFromUtc和 {3 unit hour会生成比 UTC 提前八小时的边界。 如果不指定偏移量,窗口边界将与 UTC 对齐。

idleTimeout

文档

Optional

该文档指定 $source 空闲时关闭Windows之前的等待时间。 将此设置定义为时间sizeunit的组合,其中:

  • size的值必须是非零正整数。

  • unit的值可以是以下值之一:

    • "ms" (毫秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

如果设置 idleTimeout,则仅当 $source 空闲的时间超过剩余窗口时间或 idleTimeout 时间(以较长者为准)时, Atlas Stream Processing才会关闭打开的Windows 。 只要$source进入空闲状态,空闲计时器就会启动。

例如,考虑12 : 00下午 到1 : 00下午 窗口和idleTimeout时间2小时。 如果最后一个事件发生在下午12 : 02 ,之后$source进入空闲状态,则剩余窗口时间为58分钟。 Atlas Stream Processing 会在下午2 : 02空闲2小时后关闭窗口,该时间长于剩余窗口时间和idleTimeout时间。 如果idleTimeout时间仅设置为10分钟,则 Atlas Stream Processing 会在下午1 : 00空闲58分钟后关闭窗口,该时间长于idleTimeout时间,并且剩余窗口时间。

allowedLateness

文档

Optional

文档,用于指定在处理窗口结束时间的文档后,将从源生成的窗口保持开放状态多长时间以接受延迟到达的数据。如果省略,则默认值为 3 秒。

Atlas Stream Processing 仅支持每个管道一个窗口阶段。

当您将$group阶段应用于窗口阶段时,单个组密钥的 RAM 限制为100 MB。

在 Windows 中,对某些聚合阶段的支持可能会受到限制或不可用。要了解更多信息,请参阅支持的聚合管道阶段。

如果服务中断,您可以从中断点的状态恢复窗口的内部管道。要了解更多信息,请参阅检查点。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source阶段与Apache Kafka代理建立连接,在名为my_weatherdata的主题中收集这些报告,并在将每条记录摄取到后续聚合阶段时将其公开。

  2. $hoppingWindow 阶段定义了重叠的时间窗口,持续时间为 100 秒,每 20 秒开始一次。每个窗口都会执行一个内部 pipeline,根据从 Apache Kafka 代理流式传输的 sample_weatherdata 文档中的定义,找出给定窗口持续时间内的平均 liquidPrecipitation.depth。然后,pipeline 将输出一个文档,其 _id 等同于其所表示的窗口的开始时间戳和该窗口的 averagePrecipitation

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

pipeline = [
{ $source:
{
"connectionName": "streamsExampleConnectionToKafka",
"topic": "my_weatherdata"
}
},
{ $hoppingWindow:
{
"interval": {
"size": 100,
"unit": "second"
},
"hopSize": {
"size": 20,
"unit": "second"
},
"pipeline" : [
{
$group: {
// The resulting document's _id is the $hoppingWindow's start timestamp
_id: "$_stream_meta.window.start",
averagePrecipitation: { $avg: "$liquidPrecipitation.depth" }
}
}
],
}
},
{ $merge:
{
"into":
{
"connectionName":"streamsExampleConnectionToAtlas",
"db":"streamDB",
"coll":"streamCollection"
}
}
}
]

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-08-28T19:30:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:20.000Z'),
end: ISODate('2024-08-28T19:32:00.000Z')
}
},
averagePrecipitation: 2264.3973214285716
},
{
_id: ISODate('2024-08-28T19:30:40.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:40.000Z'),
end: ISODate('2024-08-28T19:32:20.000Z')
}
},
averagePrecipitation: 2285.7061611374406
},
{
_id: ISODate('2024-08-28T19:31:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:00.000Z'),
end: ISODate('2024-08-28T19:32:40.000Z')
}
},
averagePrecipitation: 2357.6940154440153
},
{
_id: ISODate('2024-08-28T19:31:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:20.000Z'),
end: ISODate('2024-08-28T19:33:00.000Z')
}
},
averagePrecipitation: 2378.374061433447
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$lookup