$hoppingWindow
定義
$hoppingWindow
ステージでは、データ集計用のホスティング ウィンドウを指定します。 Atlas Stream Processing Windowsはステートフルで、中断された場合に復元でき、遅延データを処理するメカニズムがあります。 このウィンドウ ステージ内のストリーミング データに他のすべての集計クエリを適用する必要があります。
$hoppingWindow
$hoppingWindow
パイプライン ステージには次のプロトタイプ形式があります。{ "$hoppingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
構文
$hoppingWindow
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
interval | ドキュメント | 必須 | ホスティング ウィンドウの間隔をサイズと時間単位の組み合わせとして指定するドキュメント。
たとえば、 |
hopSize | ドキュメント | 必須 | ウィンドウの開始時間間の経過時間の長さを、
たとえば、 |
pipeline | 配列 | 必須 | ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。 |
offset | ドキュメント | 任意 | UTC に対するウィンドウ境界の時間オフセットを指定するドキュメント。 ドキュメントは、サイズ フィールド
たとえば、 |
idleTimeout | ドキュメント | 任意 |
たとえば、 12 : 00時から1 : 00時系列と |
allowedLateness | ドキュメント | 任意 | ウィンドウ 終了時間のドキュメント処理後に遅延データを受け入れるために、ソースから生成されたWindowsを起動したままにしておく時間を指定するドキュメント。 省略した場合、デフォルトは3秒になります。 |
動作
Atlas Stream Processing は、パイプラインごとに 1 つのウィンドウステージのみをサポートします。
ウィンドウ ステージに$group
ステージを適用すると、単一のグループ キーによる RAM 制限は100 MB に制限されます。
Windows内では特定の集計ステージのサポートが制限されているか、利用できない場合があります。 詳細については、 「サポートされている集計パイプライン ステージ」 を参照してください。
サービスが中断された場合、ウィンドウの内部パイプラインを中断時点の状態から再開できます。 詳細については、「チェックポイント 」を参照してください。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。$hoppingWindow
ステージでは、期間が 100 秒で、20 秒ごとに開始される重複する時間のWindowsを定義します。 各ウィンドウは内部pipeline
liquidPrecipitation.depth
sample_weatherdata
を実行し、 Apache Kafka からストリーミングされた ドキュメントで定義された平均 を見つけます。 特定のウィンドウの期間にわたって実行されるよう保証します。次に、pipeline
は、表すウィンドウの開始タイムスタンプと同等の_id
とそのウィンドウのaveragePrecipitation
を含む単一のドキュメントを出力します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { // The resulting document's _id is the $hoppingWindow's start timestamp _id: "$_stream_meta.window.start", averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:20.000Z'), end: ISODate('2024-08-28T19:32:00.000Z') } }, averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:40.000Z'), end: ISODate('2024-08-28T19:32:20.000Z') } }, averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:00.000Z'), end: ISODate('2024-08-28T19:32:40.000Z') } }, averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:20.000Z'), end: ISODate('2024-08-28T19:33:00.000Z') } }, averagePrecipitation: 2378.374061433447 }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。