Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$tumblingWindow

On this page

  • Definition
  • Syntax
  • Behavior
  • Examples

The $tumblingWindow stage specifies a tumbling window for aggregation of data. Atlas Stream Processing windows are stateful, can be recovered if interrupted, and have mechanisms for processing late-arriving data. You must apply all other aggregation queries to your streaming data within this window stage.

$tumblingWindow

A $tumblingWindow pipeline stage has the following prototype form:

{
"$tumblingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
size: <int>,
unit: "<unit-of-time>"
}
}
}

Alternatively, a $tumblingWindow pipeline stage can have allowedLateness and idleTimeout fields with an integer value of 0, as shown below:

{
"$tumblingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": 0,
"allowedLateness": 0
}
}

The $tumblingWindow stage takes a document with the following fields:

Field
Type
Necessity
Description

interval

document

Required

Document specifying the interval of a hopping window as a combination of a size and a unit of time where:

  • The value of size must be a non-zero positive integer.

  • The value of unit must be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

For example, a size of 20 and a unit of second sets each window to remain open for 20 seconds.

pipeline

array

Required

Nested aggregation pipeline evaluated against the messages within the window.

offset

document

Optional

Document specifying a time offset for window boundaries relative to UTC. The document is a combination of the size field offsetFromUtc and a unit of time where:

  • The value of offsetFromUtc must be a non-zero positive integer.

  • The value of unit must be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

For example, an offsetFromUtc of 8 and a unit of hour generates boundaries that are shifted eight hours ahead of UTC. If you do not specify an offset, the window boundaries align with UTC.

idleTimeout

document

Optional

Document specifying how long to wait before closing windows if $source is idle. Define this setting as a combination of a size and a unit of time where:

  • The value of size must be a non-zero positive integer.

  • The value of unit can be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

If you set idleTimeout, Atlas Stream Processing closes open windows only if $source is idle for longer than the greater of either the remaining window time or idleTimeout time. The idle timer starts as soon as $source goes idle.

For example, consider a 12:00 pm to 1:00 pm window and idleTimeout time of 2 hours. If the last event is at 12:02 pm after which $source goes idle, the remaining window time is 58 minutes. Atlas Stream Processing closes the window after 2 hours of idleness at 2:02 pm, which is longer than the remaining window time and the idleTimeout time. If the idleTimeout time is set to only 10 minutes, Atlas Stream Processing closes the window after 58 minutes of idleness at 1:00 pm, which is longer than idleTimeout time and the remaining window time.

Alternatively, you can define this setting with an integer value of 0. See the pipeline defintion for more information.

allowedLateness

document

Optional

Document that specifies how long to keep windows generated from the source open to accept late-arriving data after processing documents for window end time. If omitted, defaults to 3 seconds.

Alternatively, you can define this setting with an integer value of 0. See the pipeline defintion for more information.

Atlas Stream Processing supports only one window stage per pipeline.

When you apply the $group stage to your window stage, a single group key has a limit of 100 megabytes of RAM.

Support for certain aggregation stages might be limited or unavailable within windows. To learn more, see Supported Aggregation Pipeline Stages.

In the event of a service interruption, you can resume the internal pipeline of a window from its state at the point of interruption. To learn more, see Checkpoints.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages.

  2. The $tumblingWindow stage defines consecutive windows with 30-second duration. Each window executes an internal pipeline, which finds the average, median, maximum, and minimum atmosphericPressureObservation.altimeterSetting.value for the duration of that window. The pipeline then outputs a single document with an _id equivalent to the start timestamp of the window it represents and the specified values for that window.

  3. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$tumblingWindow': {
interval: {
size: 30,
unit: "second"
},
pipeline: [{
$group: {
_id: "$_stream_meta.window.start",
averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" },
medianPressure: {
$median: {
input: "$atmosphericPressureObservation.altimeterSetting.value",
method: "approximate"
}
},
maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" },
minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" }
}
}]
}
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-09-26T16:34:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-09-26T16:34:00.000Z'),
end: ISODate('2024-09-26T16:34:30.000Z')
}
},
averagePressure: 5271.47894736842,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
},
{
_id: ISODate('2024-09-26T16:34:30.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-09-26T16:34:30.000Z'),
end: ISODate('2024-09-26T16:35:00.000Z')
}
},
averagePressure: 5507.9,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

$hoppingWindow