$hoppingWindow
정의
$hoppingWindow
단계는 데이터 집계를 위한 호핑 창을 지정합니다. Atlas Stream Processing 창은 상태 저장형이며 중단된 경우 복구할 수 있고 늦게 도착한 데이터를 처리하는 메커니즘을 갖추고 있습니다. 이 창 단계 내에서 다른 모든 집계 쿼리를 스트리밍 데이터에 적용해야 합니다.
$hoppingWindow
$hoppingWindow
파이프라인 단계의 프로토타입 형식은 다음과 같습니다.{ "$hoppingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
구문
$hoppingWindow
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문서 | 필수 사항 | 크기와 시간 단위의 조합으로 호핑 창의 간격을 지정하는 문서:
예를 들어 의 |
| 문서 | 필수 사항 | 창 시작 시간 사이의 홉 길이를 시간의
예를 들어 |
| 배열 | 필수 사항 | 창 내의 메시지에 대해 평가된 중첩 집계 파이프라인입니다. |
| 문서 | 옵션 | UTC를 기준으로 창 경계의 시간 오프셋을 지정하는 문서입니다. 문서는 크기 필드
예를 들어 |
| 문서 | 옵션 |
예를 들어 12:00 오후부터 1:00 오후 기간과 |
| 문서 | 옵션 | 창 종료 시간에 대해 문서를 처리한 후 늦게 도착한 데이터를 받기 위해 소스에서 생성된 창을 얼마나 오래 열어둘지 지정하는 문서입니다. 생략하면 기본값은 3초입니다. |
행동
Atlas Stream Processing은 파이프라인당 하나의 윈도우 단계만 지원합니다.
$group
단계를 윈도우 단계에 적용하는 경우 단일 그룹 키 의 RAM 용량은 100 메가바이트로 제한됩니다.
특정 집계 단계에 대한 지원은 Windows 내에서 제한되거나 사용할 수 없을 수 있습니다. 자세한 내용은 지원되는 집계 파이프라인 단계를 참조하세요.
서비스 중단 시, 중단 시점에서 창의 내부 파이프라인을 재개할 수 있습니다 자세한 내용은 체크포인트를 참조하세요.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다.$hoppingWindow
단계는 100초 동안 지속되고 20초마다 시작되는 겹치는 시간 창을 정의합니다. 각 창은 특정 창의 기간 동안 Apache Kafka 브로커에서 스트리밍된sample_weatherdata
문서에 정의된 대로, 평균liquidPrecipitation.depth
를 찾는 내부pipeline
을 실행합니다. 그런 다음,pipeline
은 표현하는 창의 시작 타임스탬프에 해당하는_id
와 해당 창의averagePrecipitation
이 포함된 단일 문서를 출력합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { // The resulting document's _id is the $hoppingWindow's start timestamp _id: "$_stream_meta.window.start", averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:20.000Z'), end: ISODate('2024-08-28T19:32:00.000Z') } }, averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:40.000Z'), end: ISODate('2024-08-28T19:32:20.000Z') } }, averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:00.000Z'), end: ISODate('2024-08-28T19:32:40.000Z') } }, averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:20.000Z'), end: ISODate('2024-08-28T19:33:00.000Z') } }, averagePrecipitation: 2378.374061433447 }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.