$tumblingWindow
On this page
Definition
The $tumblingWindow
stage specifies a
tumbling window for aggregation of data.
Atlas Stream Processing windows are stateful, can be recovered if interrupted,
and have mechanisms for processing late-arriving data. You must apply
all other aggregation queries to your streaming data within this
window stage.
$tumblingWindow
A
$tumblingWindow
pipeline stage has the following prototype form:{ "$tumblingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { size: <int>, unit: "<unit-of-time>" } } } Alternatively, a
$tumblingWindow
pipeline stage can haveallowedLateness
andidleTimeout
fields with an integer value of 0, as shown below:{ "$tumblingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": 0, "allowedLateness": 0 } }
Syntax
The $tumblingWindow
stage takes a document with the following
fields:
Field | Type | Necessity | Description |
---|---|---|---|
| document | Required | Document specifying the interval of a hopping window as a combination of a size and a unit of time where:
For example, a |
| array | Required | Nested aggregation pipeline evaluated against the messages within the window. |
| document | Optional | Document specifying a time offset for window boundaries relative
to UTC. The document is a combination of the size field
For example, an |
| document | Optional | Document specifying how long to wait before closing windows if
If you set For example, consider a 12:00 pm to 1:00 pm window and
Alternatively, you can define this setting with an integer value of 0. See the pipeline defintion for more information. |
| document | Optional | Document that specifies how long to keep windows generated from the source open to accept late-arriving data after processing documents for window end time. If omitted, defaults to 3 seconds. Alternatively, you can define this setting with an integer value of 0. See the pipeline defintion for more information. |
Behavior
Atlas Stream Processing supports only one window stage per pipeline.
When you apply the $group
stage to your window stage,
a single group key
has a limit of 100 megabytes of RAM.
Support for certain aggregation stages might be limited or unavailable within windows. To learn more, see Supported Aggregation Pipeline Stages.
In the event of a service interruption, you can resume the internal pipeline of a window from its state at the point of interruption. To learn more, see Checkpoints.
Examples
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages.The
$tumblingWindow
stage defines consecutive windows with 30-second duration. Each window executes an internalpipeline
, which finds the average, median, maximum, and minimumatmosphericPressureObservation.altimeterSetting.value
for the duration of that window. Thepipeline
then outputs a single document with an_id
equivalent to the start timestamp of the window it represents and the specified values for that window.The
$merge
stage writes the output to an Atlas collection namedstream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$tumblingWindow': { interval: { size: 30, unit: "second" }, pipeline: [{ $group: { _id: "$_stream_meta.window.start", averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" }, medianPressure: { $median: { input: "$atmosphericPressureObservation.altimeterSetting.value", method: "approximate" } }, maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" }, minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" } } }] } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following
command:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-09-26T16:34:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-09-26T16:34:00.000Z'), end: ISODate('2024-09-26T16:34:30.000Z') } }, averagePressure: 5271.47894736842, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }, { _id: ISODate('2024-09-26T16:34:30.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-09-26T16:34:30.000Z'), end: ISODate('2024-09-26T16:35:00.000Z') } }, averagePressure: 5507.9, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.