Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$tumblingWindow

項目一覧

  • 定義
  • 構文
  • 動作

$tumblingWindowステージでは、データ集計用のローリング ウィンドウを指定します。 Atlas Stream Processing Windowsはステートフルで、中断された場合に復元でき、遅延データを処理するメカニズムがあります。 このウィンドウ ステージ内のストリーミング データに他のすべての集計クエリを適用する必要があります。

$tumblingWindow

$tumblingWindowパイプライン ステージには次のプロトタイプ形式があります。

{
"$tumblingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
size: <int>,
unit: "<unit-of-time>"
}
}
}

あるいは、$tumblingWindowパイプラインステージには、以下に示すように、整数値が 0 である allowedLateness フィールドと idleTimeout フィールドを含めることができます。

{
"$tumblingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": 0,
"allowedLateness": 0
}
}

$tumblingWindowステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

interval

ドキュメント

必須

ホスティング ウィンドウの間隔をサイズと時間単位の組み合わせとして指定するドキュメント。

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかである必要があります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

たとえば、 20sizesecondunitの場合、各ウィンドウは20秒開いたままに設定されます。

pipeline

配列

必須

ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。

offset

ドキュメント

任意

UTC に対するウィンドウ境界の時間オフセットを指定するドキュメント。 ドキュメントは、サイズ フィールドoffsetFromUtcと時間単位の組み合わせであり、

  • offsetFromUtcの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかである必要があります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

たとえば、offsetFromUtc8unithour 、UTC の 8 時間先行する境界が生成されます。オフセットを指定しない場合、ウィンドウ境界は UTC に一致します。

idleTimeout

ドキュメント

任意

$source がアイドル状態の場合にWindowsを閉じるまでの待機時間を指定するドキュメント。 この設定を、 sizeunitの時間の組み合わせとして定義します。ここでは、

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかになります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

idleTimeoutを設定すると、$sourceが残りのウィンドウ時間または idleTimeout 時間よりも長い時間アイドル状態である場合にのみ、 Atlas Stream Processingは開いているWindowsを閉じます。 アイドル タイマーは、 $sourceがアイドル状態になるとすぐに開始されます。

たとえば、 12 : 00時から1 : 00時系列とidleTimeout時間と2時間の場合を考えてみましょう。 最後のイベントが12 : 02時、 $sourceがアイドル状態になった場合、残りのウィンドウ時間は58分です。 Atlas Stream Processing は、残りのウィンドウ時間およびidleTimeout時間よりも長い、 2 : 02時間のアイドル状態が2時間経過した後にウィンドウを閉じます。 idleTimeout時間が10分のみに設定されている場合、Atlas Stream Processing は、 1 : 00 pm()で58分間のアイドル状態を維持した後にウィンドウを閉じます。これはidleTimeout時間よりも長く、残りのウィンドウ時間。

あるいは、0 の整数値を使用してこの設定を定義することもできます。詳しくは、パイプラインの定義 を参照してください。

allowedLateness

ドキュメント

任意

ウィンドウ 終了時間のドキュメント処理後に遅延データを受け入れるために、ソースから生成されたWindowsを起動したままにしておく時間を指定するドキュメント。 省略した場合、デフォルトは3秒になります。

あるいは、0 の整数値を使用してこの設定を定義することもできます。詳しくは、パイプラインの定義 を参照してください。

Atlas Stream Processing は、パイプラインごとに 1 つのウィンドウステージのみをサポートします。

ウィンドウ ステージに$groupステージを適用すると、単一のグループ キーによる RAM 制限は100 MB に制限されます。

Windows内では特定の集計ステージのサポートが制限されているか、利用できない場合があります。 詳細については、 「サポートされている集計パイプライン ステージ」 を参照してください。

サービスが中断された場合、ウィンドウの内部パイプラインを中断時点の状態から再開できます。 詳細については、「チェックポイント 」を参照してください。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。

  2. $tumblingWindow ステージは、30 秒の連続したウィンドウを定義します。各ウィンドウは内部 pipeline を実行し、そのウィンドウの時間の平均、中央値、最大、および最小 atmosphericPressureObservation.altimeterSetting.value を見つけます。次に、pipeline は、表示ウィンドウの開始タイムスタンプとそのウィンドウの指定された値と同等の _id を持つ単一のドキュメントを出力します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$tumblingWindow': {
interval: {
size: 30,
unit: "second"
},
pipeline: [{
$group: {
_id: "$_stream_meta.window.start",
averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" },
medianPressure: {
$median: {
input: "$atmosphericPressureObservation.altimeterSetting.value",
method: "approximate"
}
},
maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" },
minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" }
}
}]
}
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-09-26T16:34:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-09-26T16:34:00.000Z'),
end: ISODate('2024-09-26T16:34:30.000Z')
}
},
averagePressure: 5271.47894736842,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
},
{
_id: ISODate('2024-09-26T16:34:30.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-09-26T16:34:30.000Z'),
end: ISODate('2024-09-26T16:35:00.000Z')
}
},
averagePressure: 5507.9,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$hoppingWindow