Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$tumblingWindow

이 페이지의 내용

  • 정의
  • 구문
  • 행동
  • 예시

$tumblingWindow 단계는 데이터 집계를 위한 텀블링 창을 지정합니다. Atlas Stream Processing 창은 상태 저장형이며 중단된 경우 복구할 수 있고 늦게 도착한 데이터를 처리하는 메커니즘을 갖추고 있습니다. 이 창 단계 내에서 다른 모든 집계 쿼리를 스트리밍 데이터에 적용 해야 합니다.

$tumblingWindow

$tumblingWindow 파이프라인 단계의 프로토타입 형식은 다음과 같습니다.

{
"$tumblingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
size: <int>,
unit: "<unit-of-time>"
}
}
}

또는 $tumblingWindow 파이프라인 단계에 아래와 같이 정수 값이 0인 allowedLatenessidleTimeout 필드가 있을 수 있습니다.

{
"$tumblingWindow": {
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": 0,
"allowedLateness": 0
}
}

$tumblingWindow 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

interval

문서

필수 사항

크기와 시간 단위의 조합으로 호핑 창의 간격을 지정하는 문서:

  • size 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나여야 합니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

    • "day"

예를 들어 의 size 20unitsecond 각 창이 20 초 동안 열린 상태로 유지되도록 설정합니다.

pipeline

배열

필수 사항

창 내의 메시지에 대해 평가된 중첩 집계 파이프라인입니다.

offset

문서

옵션

UTC를 기준으로 창 경계의 시간 오프셋을 지정하는 문서입니다. 문서는 크기 필드 offsetFromUtc 와 시간 단위의 조합으로, 여기서 다음과 같습니다.

  • offsetFromUtc 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나여야 합니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

예를 들어 8offsetFromUtchourunit 는 UTC보다 8시간 앞서 이동하는 경계를 생성합니다. 오프셋을 지정하지 않으면 창 경계가 UTC에 맞춰 정렬됩니다.

idleTimeout

문서

옵션

$source 이(가) 유휴 상태인 경우 Windows 를 닫기 전에 대기할 시간을 지정하는 문서입니다. 이 설정을 시간의 sizeunit 의 조합으로 정의합니다.

  • size 값은 0이 아닌 양의 정수여야 합니다.

  • unit 의 값은 다음 중 하나일 수 있습니다.

    • "ms" (밀리초)

    • "second"

    • "minute"

    • "hour"

    • "day"

idleTimeout을(를) 설정하면 $source 이(가) 남은 창 시간 또는 idleTimeout 시간 중 더 긴 시간 동안 유휴 상태인 경우에만 Atlas Stream Processing 이 열려 있는 Windows 을 닫습니다. $source 이(가) 유휴 상태가 되는 즉시 유휴 타이머가 시작됩니다.

예를 들어 12:00 오후부터 1:00 오후 기간과 idleTimeout 시간이 2 시간이라고 가정해 보겠습니다. 마지막 이벤트가 12:02 오후에 $source 이(가) 유휴 상태인 경우 남은 창 시간은 58 분입니다. Atlas Stream Processing은 남은 창 시간과 idleTimeout 시간보다 긴 2:02 오후에 2 시간의 유휴 시간 후 창을 닫습니다. idleTimeout 시간이 10 분으로만 설정된 경우, Atlas Stream Processing은 idleTimeout 시간보다 긴 1:00 오후에 58 분의 유휴 시간 후 창을 닫으며, 나머지 창은 시간.

또는 정수 값 을 사용하여 이 설정을 정의할 수 0 있습니다. 자세한 내용은 파이프라인 정의를 참조하세요.

allowedLateness

문서

옵션

창 종료 시간에 대해 문서를 처리한 후 늦게 도착한 데이터를 받기 위해 소스에서 생성된 을 얼마나 오래 열어둘지 지정하는 문서입니다. 생략하면 기본값은 3초입니다.

또는 정수 값 을 사용하여 이 설정을 정의할 수 0 있습니다. 자세한 내용은 파이프라인 정의를 참조하세요.

Atlas Stream Processing은 파이프라인당 하나의 윈도우 단계만 지원합니다.

$group 단계를 윈도우 단계에 적용하는 경우 단일 그룹 키 의 RAM 용량은 100 메가바이트로 제한됩니다.

특정 집계 단계에 대한 지원은 Windows 내에서 제한되거나 사용할 수 없을 수 있습니다. 자세한 내용은 지원되는 집계 파이프라인 단계를 참조하세요.

서비스 중단 시, 중단 시점에서 창의 내부 파이프라인을 재개할 수 있습니다 자세한 내용은 체크포인트를 참조하세요.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다.

  2. $tumblingWindow 단계는 30초 동안 지속되는 연속 창을 정의합니다. 각 창은 내부 pipeline을 실행하여 해당 창의 기간 동안 평균, 중앙값, 최대값 및 최소값 atmosphericPressureObservation.altimeterSetting.value를 찾습니다. 그런 다음, pipeline은 표현하는 창의 시작 타임스탬프에 해당하는 _id와 해당 창에 대해 지정된 값이 포함된 단일 문서를 출력합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$tumblingWindow': {
interval: {
size: 30,
unit: "second"
},
pipeline: [{
$group: {
_id: "$_stream_meta.window.start",
averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" },
medianPressure: {
$median: {
input: "$atmosphericPressureObservation.altimeterSetting.value",
method: "approximate"
}
},
maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" },
minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" }
}
}]
}
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-09-26T16:34:00.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-09-26T16:34:00.000Z'),
end: ISODate('2024-09-26T16:34:30.000Z')
}
},
averagePressure: 5271.47894736842,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
},
{
_id: ISODate('2024-09-26T16:34:30.000Z'),
_stream_meta: {
source: { type: 'kafka' },
window: {
start: ISODate('2024-09-26T16:34:30.000Z'),
end: ISODate('2024-09-26T16:35:00.000Z')
}
},
averagePressure: 5507.9,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

$hoppingWindow

이 페이지의 내용