Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$hoppingWindow

On this page

  • Definition
  • Syntax
  • Behavior

The $hoppingWindow stage specifies a hopping window for aggregation of data. Atlas Stream Processing windows are stateful, can be recovered if interrupted, and have mechanisms for processing late-arriving data. You must apply all other aggregation queries to your streaming data within this window stage.

$hoppingWindow

A $hoppingWindow pipeline stage has the following prototype form:

{
"$hoppingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

The $hoppingWindow stage takes a document with the following fields:

Field
Type
Necessity
Description
interval
document
Required

Document specifying the interval of a hopping window as a combination of a size and a unit of time where:

  • The value of size must be a non-zero positive integer.

  • The value of unit can be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

For example, a size of 20 and a unit of second sets each window to remain open for 20 seconds.

hopSize
document
Required

Document that specifies the length of the hop between window start times as a combination of a size and a unit of time where:

  • The value of size must be a non-zero positive integer.

  • The value of unit can be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

For example, a size of 10 and a unit of second defines a 10-second hop between window start times.

pipeline
array
Required
Nested aggregation pipeline evaluated against the messages within the window.
offset
document
Optional

Document specifying a time offset for window boundaries relative to UTC. The document is a combination of the size field offsetFromUtc and a unit of time where:

  • The value of offsetFromUtc must be a non-zero positive integer.

  • The value of unit must be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

For example, an offsetFromUtc of 8 and a unit of hour generates boundaries that are shifted eight hours ahead of UTC. If you do not specify an offset, the window boundaries align with UTC.

idleTimeout
document
Optional

Document specifying how long to wait before closing windows if $source is idle. Define this setting as a combination of a size and a unit of time where:

  • The value of size must be a non-zero positive integer.

  • The value of unit can be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

If you set idleTimeout, Atlas Stream Processing closes open windows only if $source is idle for longer than the greater of either the remaining window time or idleTimeout time. The idle timer starts as soon as $source goes idle.

For example, consider a 12:00 pm to 1:00 pm window and idleTimeout time of 2 hours. If the last event is at 12:02 pm after which $source goes idle, the remaining window time is 58 minutes. Atlas Stream Processing closes the window after 2 hours of idleness at 2:02 pm, which is longer than the remaining window time and the idleTimeout time. If the idleTimeout time is set to only 10 minutes, Atlas Stream Processing closes the window after 58 minutes of idleness at 1:00 pm, which is longer than idleTimeout time and the remaining window time.

allowedLateness
document
Optional
Document that specifies how long to keep windows generated from the source open to accept late-arriving data after processing documents for window end time. If omitted, defaults to 3 seconds.

Atlas Stream Processing supports only one window stage per pipeline.

When you apply the $group stage to your window stage, a single group key has a limit of 100 megabytes of RAM.

Support for certain aggregation stages might be limited or unavailable within windows. To learn more, see Supported Aggregation Pipeline Stages.

In the event of a service interruption, you can resume the internal pipeline of a window from its state at the point of interruption. To learn more, see Checkpoints.

← $lookup