Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$source

이 페이지의 내용

  • 정의
  • 구문
  • Apache Kafka 브로커
  • MongoDB collection change stream
  • MongoDB database change stream
  • MongoDB 클러스터 전체 변경 스트림 소스
  • 문서 배열
  • 행동
  • 예시
$source

$source 단계에서는 데이터를 스트리밍할 연결 레지스트리의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.

참고

Atlas 서버리스 인스턴스를 $source로 사용할 수 없습니다.

Apache Kafka 에서 스트리밍 데이터를 작업하려면 브로커에서 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

topic

문자열 또는 문자열 배열

필수 사항

메시지를 스트리밍할 하나 이상의 Apache Kafka 주제 이름입니다. 두 개 이상의 주제에서 메시지를 스트리밍하려면 배열로 지정합니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName

문자열

옵션

$source에 의해 프로젝션된 타임스탬프 필드의 이름을 덮어쓰는 이름입니다.

Atlas Stream Processing 파이프라인의 $source 단계는 문서의 지정된 타임스탬프와 함께 _ts 필드를 프로젝션합니다. 스트리밍 데이터 소스는 _ts 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리를 수행하기 전에 tsFieldName을 사용하여 _ts라는 이름의 소스 제공 필드 이름을 변경하세요.

partitionIdleTimeout

문서

옵션

파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다.

partitionIdleTimeout.size

integer

옵션

파티션 유휴 시간 초과 기간을 지정하는 숫자입니다.

partitionIdleTimeout.unit

문자열

옵션

파티션 유휴 시간 초과 기간의 시간 단위입니다.

unit 의 값은 다음 중 하나일 수 있습니다.

  • "ms" (밀리초)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.auto_offset_reset

문자열

옵션

Apache Kafka 소스 토픽에서 어느 이벤트부터 수집을 시작할지 지정합니다. auto_offset_reset은 다음 값을 사용합니다.

  • end, latest 또는 largest 을 사용하여 애그리게이션이 초기화될 때 주제의 최신 이벤트로부터 수집을 시작합니다.

  • earliest, beginning 또는 smallest : 주제에서 가장 오래된 이벤트부터 수집을 시작합니다.

기본값은 latest입니다.

config.group_id

문자열

옵션

스트림 프로세서와 연결할 kafka 소비자 그룹의 ID입니다. 생략된 경우 Atlas Stream Processing은 다음 형식으로 스트림 처리 인스턴스를 자동 생성된 ID와 연결합니다.

asp-${streamProcessorId}-consumer

Atlas Stream Processing은 Apache Kafka 에 파티션 오프셋을 커밋합니다. 체크포인트가 커밋된 후 지정된 소비자 그룹 ID에 대한 브로커. 해당 오프셋을 통한 메시지가 체크포인트에 지속적으로 기록되면 오프셋을 커밋합니다. 이를 통해 Kafka 브로커 소비자 그룹 메타데이터에서 직접 스트림 프로세서의 오프셋 지연 및 진행 상황을 추적할 수 있습니다.

config.keyFormat

문자열

옵션

Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

기본값은 binData입니다.

config.keyFormatError

문자열

옵션

Apache Kafka 키 데이터의 직렬화 시 발생하는 오류를 처리하는 방법 다음 값 중 하나여야 합니다.

  • dlq는, 해당 문서를 데드 레터 큐에 기록합니다.

  • passThrough- 키 데이터 없이 문서를 다음 단계로 보냅니다.

참고

Atlas Stream Processing에서는 소스 데이터 스트림의 문서가 유효한 json 또는 ejson이어야 합니다. Atlas Stream Processing은 데드 레터 큐를 구성한 경우 이 요구 사항을 충족하지 않는 문서를 데드 레터 큐로 설정합니다.

Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

조건부

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName

문자열

옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 내부적으로 _ts라는 수신 메시지에 타임스탬프 정보를 저장하는 필드를 추가합니다. 스트리밍 데이터의 소스는 _ts라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리를 수행하기 전에 tsFieldName을 사용하여 _ts라는 이름의 소스 제공 필드 이름을 변경하세요.

db

문자열

필수 사항

connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다.

coll

문자열 또는 문자열 배열

필수 사항

로 지정된 Atlas 인스턴스 에서 호스팅되는 하나 이상의 MongoDB 컬렉션 connectionName 이름입니다. 이러한 컬렉션의 변경 스트림 은 스트리밍 데이터 소스 역할을 합니다. 이 필드 를 생략하면 스트림 프로세서는 MongoDB 데이터베이스 변경 스트림에서 소싱합니다.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.startAfter

token

조건부

원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime

timestamp

조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.fullDocument

문자열

조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다.

config.fullDocumentOnly

부울

조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다.

config.fullDocumentBeforeChange

문자열

옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다.

config.pipeline

문서

옵션

원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다.

Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

조건부

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName

문자열

옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 내부적으로 _ts라는 수신 메시지에 타임스탬프 정보를 저장하는 필드를 추가합니다. 스트리밍 데이터의 소스는 _ts라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리를 수행하기 전에 tsFieldName을 사용하여 _ts라는 이름의 소스 제공 필드 이름을 변경하세요.

db

문자열

필수 사항

connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.startAfter

token

조건부

원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime

timestamp

조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.fullDocument

문자열

조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentOnly

부울

조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentBeforeChange

문자열

옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.pipeline

문서

옵션

원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다.

전체 Atlas 클러스터 변경 스트림에서 스트리밍 데이터를 처리하기 위해 $source 단계는 다음과 같은 프로토타입 형태를 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

조건부

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName

문자열

옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 내부적으로 _ts라는 수신 메시지에 타임스탬프 정보를 저장하는 필드를 추가합니다. 스트리밍 데이터의 소스는 _ts라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리를 수행하기 전에 tsFieldName을 사용하여 _ts라는 이름의 소스 제공 필드 이름을 변경하세요.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.startAfter

token

조건부

원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime

timestamp

조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.fullDocument

문자열

조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentOnly

부울

조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentBeforeChange

문자열

옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.pipeline

문서

옵션

원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다.

문서 배열에서 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName

문자열

옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 내부적으로 _ts라는 수신 메시지에 타임스탬프 정보를 저장하는 필드를 추가합니다. 스트리밍 데이터의 소스는 _ts라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리를 수행하기 전에 tsFieldName을 사용하여 _ts라는 이름의 소스 제공 필드 이름을 변경하세요.

documents

배열

조건부

Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. connectionName 필드를 사용할 때는 이 필드를 사용하지 마세요.

$source 는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source 단계는 하나만 사용할 수 있습니다.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여 ingestionTime으로 설정합니다.

  2. $match 단계에서는 dewPoint.value 값이 5.0 미만인 문서를 제외하고 dewPoint.value 값이 5.0 보다 큰 문서를 다음 단계로 전달합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

집계 파이프라인