$tumblingWindow
定義
$tumblingWindow
ステージでは、データ集計用のローリング ウィンドウを指定します。 Atlas Stream Processing Windowsはステートフルで、中断された場合に復元でき、遅延データを処理するメカニズムがあります。 このウィンドウ ステージ内のストリーミング データに他のすべての集計クエリを適用する必要があります。
$tumblingWindow
$tumblingWindow
パイプライン ステージには次のプロトタイプ形式があります。{ "$tumblingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { size: <int>, unit: "<unit-of-time>" } } } あるいは、
$tumblingWindow
パイプラインステージには、以下に示すように、整数値が 0 であるallowedLateness
フィールドとidleTimeout
フィールドを含めることができます。{ "$tumblingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": 0, "allowedLateness": 0 } }
構文
$tumblingWindow
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| ドキュメント | 必須 | ホスティング ウィンドウの間隔をサイズと時間単位の組み合わせとして指定するドキュメント。
たとえば、 |
| 配列 | 必須 | ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。 |
| ドキュメント | 任意 | UTC に対するウィンドウ境界の時間オフセットを指定するドキュメント。 ドキュメントは、サイズ フィールド
たとえば、 |
| ドキュメント | 任意 |
たとえば、 12 : 00時から1 : 00時系列と |
| ドキュメント | 任意 | ウィンドウ 終了時間のドキュメント処理後に遅延データを受け入れるために、ソースから生成されたWindowsを起動したままにしておく時間を指定するドキュメント。 省略した場合、デフォルトは3秒になります。 |
動作
Atlas Stream Processing は、パイプラインごとに 1 つのウィンドウステージのみをサポートします。
ウィンドウ ステージに$group
ステージを適用すると、単一のグループ キーによる RAM 制限は100 MB に制限されます。
Windows内では特定の集計ステージのサポートが制限されているか、利用できない場合があります。 詳細については、 「サポートされている集計パイプライン ステージ」 を参照してください。
サービスが中断された場合、ウィンドウの内部パイプラインを中断時点の状態から再開できます。 詳細については、「チェックポイント 」を参照してください。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。$tumblingWindow
ステージは、30 秒の連続したウィンドウを定義します。各ウィンドウは内部pipeline
を実行し、そのウィンドウの時間の平均、中央値、最大、および最小atmosphericPressureObservation.altimeterSetting.value
を見つけます。次に、pipeline
は、表示ウィンドウの開始タイムスタンプとそのウィンドウの指定された値と同等の_id
を持つ単一のドキュメントを出力します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$tumblingWindow': { interval: { size: 30, unit: "second" }, pipeline: [{ $group: { _id: "$_stream_meta.window.start", averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" }, medianPressure: { $median: { input: "$atmosphericPressureObservation.altimeterSetting.value", method: "approximate" } }, maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" }, minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" } } }] } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-09-26T16:34:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-09-26T16:34:00.000Z'), end: ISODate('2024-09-26T16:34:30.000Z') } }, averagePressure: 5271.47894736842, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }, { _id: ISODate('2024-09-26T16:34:30.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-09-26T16:34:30.000Z'), end: ISODate('2024-09-26T16:35:00.000Z') } }, averagePressure: 5507.9, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。