$tumblingWindow
정의
$tumblingWindow
단계는 데이터 집계를 위한 텀블링 창을 지정합니다. Atlas Stream Processing 창은 상태 저장형이며 중단된 경우 복구할 수 있고 늦게 도착한 데이터를 처리하는 메커니즘을 갖추고 있습니다. 이 창 단계 내에서 다른 모든 집계 쿼리를 스트리밍 데이터에 적용 해야 합니다.
$tumblingWindow
$tumblingWindow
파이프라인 단계의 프로토타입 형식은 다음과 같습니다.{ "$tumblingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { size: <int>, unit: "<unit-of-time>" } } } 또는
$tumblingWindow
파이프라인 단계에 아래와 같이 정수 값이 0인allowedLateness
및idleTimeout
필드가 있을 수 있습니다.{ "$tumblingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": 0, "allowedLateness": 0 } }
구문
$tumblingWindow
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문서 | 필수 사항 | 크기와 시간 단위의 조합으로 호핑 창의 간격을 지정하는 문서:
예를 들어 의 |
| 배열 | 필수 사항 | 창 내의 메시지에 대해 평가된 중첩 집계 파이프라인입니다. |
| 문서 | 옵션 | UTC를 기준으로 창 경계의 시간 오프셋을 지정하는 문서입니다. 문서는 크기 필드
예를 들어 |
| 문서 | 옵션 |
예를 들어 12:00 오후부터 1:00 오후 기간과 또는 정수 값 을 사용하여 이 설정을 정의할 수 0 있습니다. 자세한 내용은 파이프라인 정의를 참조하세요. |
| 문서 | 옵션 |
행동
Atlas Stream Processing은 파이프라인당 하나의 윈도우 단계만 지원합니다.
$group
단계를 윈도우 단계에 적용하는 경우 단일 그룹 키 의 RAM 용량은 100 메가바이트로 제한됩니다.
특정 집계 단계에 대한 지원은 Windows 내에서 제한되거나 사용할 수 없을 수 있습니다. 자세한 내용은 지원되는 집계 파이프라인 단계를 참조하세요.
서비스 중단 시, 중단 시점에서 창의 내부 파이프라인을 재개할 수 있습니다 자세한 내용은 체크포인트를 참조하세요.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다.$tumblingWindow
단계는 30초 동안 지속되는 연속 창을 정의합니다. 각 창은 내부pipeline
을 실행하여 해당 창의 기간 동안 평균, 중앙값, 최대값 및 최소값atmosphericPressureObservation.altimeterSetting.value
를 찾습니다. 그런 다음,pipeline
은 표현하는 창의 시작 타임스탬프에 해당하는_id
와 해당 창에 대해 지정된 값이 포함된 단일 문서를 출력합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$tumblingWindow': { interval: { size: 30, unit: "second" }, pipeline: [{ $group: { _id: "$_stream_meta.window.start", averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" }, medianPressure: { $median: { input: "$atmosphericPressureObservation.altimeterSetting.value", method: "approximate" } }, maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" }, minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" } } }] } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-09-26T16:34:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-09-26T16:34:00.000Z'), end: ISODate('2024-09-26T16:34:30.000Z') } }, averagePressure: 5271.47894736842, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }, { _id: ISODate('2024-09-26T16:34:30.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-09-26T16:34:30.000Z'), end: ISODate('2024-09-26T16:35:00.000Z') } }, averagePressure: 5507.9, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.