Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$hoppingWindow

項目一覧

  • 定義
  • 構文
  • 動作

$hoppingWindowステージでは、データ集計用のホスティング ウィンドウを指定します。 Atlas Stream Processing Windowsはステートフルで、中断された場合に復元でき、遅延データを処理するメカニズムがあります。 このウィンドウ ステージ内のストリーミング データに他のすべての集計クエリを適用する必要があります。

$hoppingWindow

$hoppingWindowパイプライン ステージには次のプロトタイプ形式があります。

{
"$hoppingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

$hoppingWindowステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明
interval
ドキュメント
必須

ホスティング ウィンドウの間隔をサイズと時間単位の組み合わせとして指定するドキュメント。

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかになります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

たとえば、 20sizesecondunitの場合、各ウィンドウは20秒開いたままに設定されます。

hopSize
ドキュメント
必須

ウィンドウの開始時間間の経過時間の長さを、 sizeunitの時間の組み合わせとして指定するドキュメント。ここでは、

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかになります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

たとえば、 size10で、かつunitsecondの場合、ウィンドウ起動時間間の10秒のホ アップ が定義されます。

pipeline
配列
必須
ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。
offset
ドキュメント
任意

UTC に対するウィンドウ境界の時間オフセットを指定するドキュメント。 ドキュメントは、サイズ フィールドoffsetFromUtcと時間単位の組み合わせであり、

  • offsetFromUtcの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかである必要があります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

たとえば、offsetFromUtc8unithour 、UTC の 8 時間先行する境界が生成されます。オフセットを指定しない場合、ウィンドウ境界は UTC に一致します。

idleTimeout
ドキュメント
任意

$source がアイドル状態の場合にWindowsを閉じるまでの待機時間を指定するドキュメント。 この設定を、 sizeunitの時間の組み合わせとして定義します。ここでは、

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかになります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

idleTimeoutを設定すると、$sourceが残りのウィンドウ時間または idleTimeout 時間よりも長い時間アイドル状態である場合にのみ、 Atlas Stream Processingは開いているWindowsを閉じます。 アイドル タイマーは、 $sourceがアイドル状態になるとすぐに開始されます。

たとえば、 12 : 00時から1 : 00時系列とidleTimeout時間と2時間の場合を考えてみましょう。 最後のイベントが12 : 02時、 $sourceがアイドル状態になった場合、残りのウィンドウ時間は58分です。 Atlas Stream Processing は、残りのウィンドウ時間およびidleTimeout時間よりも長い、 2 : 02時間のアイドル状態が2時間経過した後にウィンドウを閉じます。 idleTimeout時間が10分のみに設定されている場合、Atlas Stream Processing は、 1 : 00 pm()で58分間のアイドル状態を維持した後にウィンドウを閉じます。これはidleTimeout時間よりも長く、残りのウィンドウ時間。

allowedLateness
ドキュメント
任意
ウィンドウ 終了時間のドキュメント処理後に遅延データを受け入れるために、ソースから生成されたWindowsを起動したままにしておく時間を指定するドキュメント。 省略した場合、デフォルトは3秒になります。

Atlas Stream Processing は、パイプラインごとに 1 つのウィンドウステージのみをサポートします。

ウィンドウ ステージに$groupステージを適用すると、単一のグループ キーによる RAM 制限は100 MB に制限されます。

Windows内では特定の集計ステージのサポートが制限されているか、利用できない場合があります。 詳細については、 「サポートされている集計パイプライン ステージ」 を参照してください。

サービスが中断された場合、ウィンドウの内部パイプラインを中断時点の状態から再開できます。 詳細については、「チェックポイント 」を参照してください。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。

  2. $hoppingWindowステージでは、期間が 100 秒で、20 秒ごとに開始される重複する時間のWindowsを定義します。 各ウィンドウは内部pipeline liquidPrecipitation.depthsample_weatherdataを実行し、 Apache Kafka からストリーミングされた ドキュメントで定義された平均 を見つけます。 特定のウィンドウの期間にわたって実行されるよう保証します。次に、 pipelineは、表すウィンドウの開始タイムスタンプと同等の_idとそのウィンドウのaveragePrecipitationを含む単一のドキュメントを出力します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

pipeline = [
{ $source:
{
"connectionName": "streamsExampleConnectionToKafka",
"topic": "my_weatherdata"
}
},
{ $hoppingWindow:
{
"interval": {
"size": 100,
"unit": "second"
},
"hopSize": {
"size": 20,
"unit": "second"
},
"pipeline" : [
{
$group: {
// The resulting document's _id is the $hoppingWindow's start timestamp
_id: "$_stream_meta.window.start",
averagePrecipitation: { $avg: "$liquidPrecipitation.depth" }
}
}
],
}
},
{ $merge:
{
"into":
{
"connectionName":"streamsExampleConnectionToAtlas",
"db":"streamDB",
"coll":"streamCollection"
}
}
}
]

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-08-28T19:30:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:20.000Z'),
end: ISODate('2024-08-28T19:32:00.000Z')
}
},
averagePrecipitation: 2264.3973214285716
},
{
_id: ISODate('2024-08-28T19:30:40.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:40.000Z'),
end: ISODate('2024-08-28T19:32:20.000Z')
}
},
averagePrecipitation: 2285.7061611374406
},
{
_id: ISODate('2024-08-28T19:31:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:00.000Z'),
end: ISODate('2024-08-28T19:32:40.000Z')
}
},
averagePrecipitation: 2357.6940154440153
},
{
_id: ISODate('2024-08-28T19:31:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:20.000Z'),
end: ISODate('2024-08-28T19:33:00.000Z')
}
},
averagePrecipitation: 2378.374061433447
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$lookup