Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$hoppingWindow

이 페이지의 내용

  • 정의
  • 구문
  • 행동
  • 예시

$hoppingWindow 단계는 데이터 집계를 위한 호핑 창을 지정합니다. Atlas Stream Processing 창은 상태 저장형이며 중단된 경우 복구할 수 있고 늦게 도착한 데이터를 처리하는 메커니즘을 갖추고 있습니다. 이 창 단계 내에서 다른 모든 집계 쿼리를 스트리밍 데이터에 적용해야 합니다.

$hoppingWindow

$hoppingWindow 파이프라인 단계의 프로토타입 형식은 다음과 같습니다.

{
"$hoppingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

$hoppingWindow 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
interval
문서
필수 사항

크기와 시간 단위의 조합으로 호핑 창의 간격을 지정하는 문서:

  • size 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나일 수 있습니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

    • "day"

예를 들어 의 size 20unitsecond 각 창이 20 초 동안 열린 상태로 유지되도록 설정합니다.

hopSize
문서
필수 사항

창 시작 시간 사이의 길이를 시간의 sizeunit 의 조합으로 지정하는 문서입니다.

  • size 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나일 수 있습니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

    • "day"

예를 들어 10sizesecondunit은 창 시작 시간 사이의 10초 간격을 정의합니다.

pipeline
배열
필수 사항
창 내의 메시지에 대해 평가된 중첩 집계 파이프라인입니다.
offset
문서
옵션

UTC를 기준으로 창 경계의 시간 오프셋을 지정하는 문서입니다. 문서는 크기 필드 offsetFromUtc 와 시간 단위의 조합으로, 여기서 다음과 같습니다.

  • offsetFromUtc 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나여야 합니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

예를 들어 8offsetFromUtchourunit 는 UTC보다 8시간 앞서 이동하는 경계를 생성합니다. 오프셋을 지정하지 않으면 창 경계가 UTC에 맞춰 정렬됩니다.

idleTimeout
문서
옵션

$source 이(가) 유휴 상태인 경우 Windows 를 닫기 전에 대기할 시간을 지정하는 문서입니다. 이 설정을 시간의 sizeunit 의 조합으로 정의합니다.

  • size 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나일 수 있습니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

    • "day"

idleTimeout을(를) 설정하면 $source 이(가) 남은 창 시간 또는 idleTimeout 시간 중 더 긴 시간 동안 유휴 상태인 경우에만 Atlas Stream Processing 이 열려 있는 Windows 을 닫습니다. $source 이(가) 유휴 상태가 되는 즉시 유휴 타이머가 시작됩니다.

예를 들어 12:00 오후부터 1:00 오후 기간과 idleTimeout 시간이 2 시간이라고 가정해 보겠습니다. 마지막 이벤트가 12:02 오후에 $source 이(가) 유휴 상태인 경우 남은 창 시간은 58 분입니다. Atlas Stream Processing은 남은 창 시간과 idleTimeout 시간보다 긴 2:02 오후에 2 시간의 유휴 시간 후 창을 닫습니다. idleTimeout 시간이 10 분으로만 설정된 경우, Atlas Stream Processing은 idleTimeout 시간보다 긴 1:00 오후에 58 분의 유휴 시간 후 창을 닫으며, 나머지 창은 시간.

allowedLateness
문서
옵션
창 종료 시간에 대해 문서를 처리한 후 늦게 도착한 데이터를 받기 위해 소스에서 생성된 을 얼마나 오래 열어둘지 지정하는 문서입니다. 생략하면 기본값은 3초입니다.

Atlas Stream Processing은 파이프라인당 하나의 윈도우 단계만 지원합니다.

$group 단계를 윈도우 단계에 적용하는 경우 단일 그룹 키 의 RAM 용량은 100 메가바이트로 제한됩니다.

특정 집계 단계에 대한 지원은 Windows 내에서 제한되거나 사용할 수 없을 수 있습니다. 자세한 내용은 지원되는 집계 파이프라인 단계를 참조하세요.

서비스 중단 시, 중단 시점에서 창의 내부 파이프라인을 재개할 수 있습니다 자세한 내용은 체크포인트를 참조하세요.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다.

  2. $hoppingWindow 단계는 100초 동안 지속되고 20초마다 시작되는 겹치는 시간 창을 정의합니다. 각 창은 특정 창의 기간 동안 Apache Kafka 브로커에서 스트리밍된 sample_weatherdata 문서에 정의된 대로, 평균 liquidPrecipitation.depth를 찾는 내부 pipeline을 실행합니다. 그런 다음, pipeline은 표현하는 창의 시작 타임스탬프에 해당하는 _id와 해당 창의 averagePrecipitation이 포함된 단일 문서를 출력합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

pipeline = [
{ $source:
{
"connectionName": "streamsExampleConnectionToKafka",
"topic": "my_weatherdata"
}
},
{ $hoppingWindow:
{
"interval": {
"size": 100,
"unit": "second"
},
"hopSize": {
"size": 20,
"unit": "second"
},
"pipeline" : [
{
$group: {
// The resulting document's _id is the $hoppingWindow's start timestamp
_id: "$_stream_meta.window.start",
averagePrecipitation: { $avg: "$liquidPrecipitation.depth" }
}
}
],
}
},
{ $merge:
{
"into":
{
"connectionName":"streamsExampleConnectionToAtlas",
"db":"streamDB",
"coll":"streamCollection"
}
}
}
]

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-08-28T19:30:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:20.000Z'),
end: ISODate('2024-08-28T19:32:00.000Z')
}
},
averagePrecipitation: 2264.3973214285716
},
{
_id: ISODate('2024-08-28T19:30:40.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:30:40.000Z'),
end: ISODate('2024-08-28T19:32:20.000Z')
}
},
averagePrecipitation: 2285.7061611374406
},
{
_id: ISODate('2024-08-28T19:31:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:00.000Z'),
end: ISODate('2024-08-28T19:32:40.000Z')
}
},
averagePrecipitation: 2357.6940154440153
},
{
_id: ISODate('2024-08-28T19:31:20.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-08-28T19:31:20.000Z'),
end: ISODate('2024-08-28T19:33:00.000Z')
}
},
averagePrecipitation: 2378.374061433447
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

$lookup

이 페이지의 내용