문서 메뉴
문서 홈
/
MongoDB 아틀라스
/ /

$source

이 페이지의 내용

  • 정의
  • 구문
  • Apache Kafka 브로커
  • MongoDB collection change stream
  • MongoDB database change stream
  • 문서 배열
  • 행동
$source

$source 단계는 데이터를 스트리밍할 연결 레지스트리 의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.

참고

Atlas 서버리스 인스턴스를 $source 로 사용할 수 없습니다.

Apache Kafka 에서 스트리밍 데이터를 작업하려면 브로커에서 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : "<source-topic>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
필수 사항
데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.
topic
문자열
필수 사항
Apache Kafka 의 이름 메시지를 스트리밍할 주제입니다.
timeField
문서
옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName
문자열
옵션

$source에 의해 프로젝션된 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인의 $source 단계는 문서에 할당된 타임스탬프를 사용하여 _ts 필드를 프로젝션합니다. 스트리밍 데이터의 소스는 _ts 이라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리가 수행되기 전에 tsFieldName 를 사용하여 _ts 이라는 소스 제공 필드의 이름을 변경합니다.

partitionIdleTimeout
문서
옵션
파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다.
partitionIdleTimeout.size
integer
옵션
파티션 유휴 시간 초과 기간을 지정하는 숫자입니다.
partitionIdleTimeout.unit
문자열
옵션

파티션 유휴 시간 초과 기간의 시간 단위입니다.

unit 의 값은 다음 중 하나일 수 있습니다.

  • "ms" (밀리초)

  • "second"

  • "minute"

  • "hour"

  • "day"

config
문서
옵션
다양한 기본값을 재정의하는 필드가 포함된 문서입니다.
config.auto_offset_reset
문자열
옵션

Apache Kafka 에서 어떤 이벤트를 수집을 시작할 소스 주제입니다. auto_offset_reset 는 다음 값을 사용합니다.

  • end, latest 또는 largest 을 사용하여 애그리게이션이 초기화될 때 주제의 최신 이벤트로부터 수집을 시작합니다.

  • earliest, beginning 또는 smallest : 주제에서 가장 오래된 이벤트부터 수집을 시작합니다.

기본값은 latest입니다.

config.group_id
문자열
옵션

스트림 프로세서와 연결할 kafka 소비자 그룹의 ID입니다. 생략된 경우 Atlas Stream Processing은 다음 형식으로 스트림 처리 인스턴스를 자동 생성된 ID와 연결합니다.

asp-${streamProcessorId}-consumer

Atlas Stream Processing은 Apache Kafka 에 파티션 오프셋을 커밋합니다. 체크포인트가 커밋된 후 지정된 소비자 그룹 ID에 대한 브로커. 해당 오프셋을 통한 메시지가 체크포인트에 지속적으로 기록되면 오프셋을 커밋합니다. 이를 통해 Kafka 브로커 소비자 그룹 메타데이터에서 직접 스트림 프로세서의 오프셋 지연 및 진행 상황을 추적할 수 있습니다.

config.keyFormat
문자열
옵션

Apache Kafka 를 역직렬화하는 데 사용되는 데이터 유형 주요 데이터. 다음 값 중 하나여야 합니다.

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

기본값은 binData입니다.

config.keyFormatError
문자열
옵션

Apache Kafka 를 역직렬화할 때 발생하는 오류를 처리하는 방법 주요 데이터. 다음 값 중 하나여야 합니다.

참고

Atlas Stream Processing을 사용하려면 소스 데이터 스트림의 문서가 유효한 json 또는 ejson 이어야 합니다. Atlas Stream Processing은 이 요구 사항을 충족하지 않는 문서를 구성한 경우 데드 레터 대기열 로 설정합니다.

Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
조건부
데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.
timeField
문서
옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName
문자열
옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 체크포인트 정보를 저장하기 위해 수신 메시지에 _ts 라는 필드를 내부적으로 추가합니다. 스트리밍 데이터의 소스는 _ts 이라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리가 발생하기 전에 tsFieldName 를 사용하여 _ts 로 명명된 소스 제공 필드의 이름을 변경합니다.

db
문자열
필수 사항
connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다.
coll
문자열 또는 문자열 배열
필수 사항
connectionName 로 지정된 Atlas 인스턴스에서 호스팅되는 하나 이상의 MongoDB 컬렉션 이름입니다. 이러한 컬렉션의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다. 이 필드를 생략하면 스트림 프로세서는 MongoDB 데이터베이스 변경 스트림에서 소싱합니다.
config
문서
옵션
다양한 기본값을 재정의하는 필드가 포함된 문서입니다.
config.startAfter
토큰
조건부

소스가 보고를 시작하는 변경 이벤트입니다. 이는 재개 토큰 형식을 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime
timestamp
조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.fullDocument
문자열
조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

컬렉션 변경 스트림과 함께 이 필드를 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentOnly
부울
조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

컬렉션 변경 스트림과 함께 이 필드를 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentBeforeChange
문자열
옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

컬렉션 변경 스트림과 함께 이 필드를 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.pipeline
문서
옵션
원점에서 변경 스트림 출력을 필터링하는 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다.

Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
조건부
데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.
timeField
문서
옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName
문자열
옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 체크포인트 정보를 저장하기 위해 수신 메시지에 _ts 라는 필드를 내부적으로 추가합니다. 스트리밍 데이터의 소스는 _ts 이라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리가 발생하기 전에 tsFieldName 를 사용하여 _ts 로 명명된 소스 제공 필드의 이름을 변경합니다.

db
문자열
필수 사항
connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다.
config
문서
옵션
다양한 기본값을 재정의하는 필드가 포함된 문서입니다.
config.startAfter
토큰
조건부

소스가 보고를 시작하는 변경 이벤트입니다. 이는 재개 토큰 형식을 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime
timestamp
조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.fullDocument
문자열
조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentOnly
부울
조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentBeforeChange
문자열
옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.pipeline
문서
옵션
원점에서 변경 스트림 출력을 필터링하는 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다.

문서 배열에서 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
timeField
문서
옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

tsFieldName
문자열
옵션

소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다.

Atlas Stream Processing 파이프라인은 체크포인트 정보를 저장하기 위해 수신 메시지에 _ts 라는 필드를 내부적으로 추가합니다. 스트리밍 데이터의 소스는 _ts 이라는 필드를 사용하여 각 메시지의 타임스탬프를 저장할 수도 있습니다. 이러한 필드 간의 충돌을 방지하려면 추가 처리가 발생하기 전에 tsFieldName 를 사용하여 _ts 로 명명된 소스 제공 필드의 이름을 변경합니다.

documents
배열
조건부
Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. connectionName 필드를 사용할 때는 이 필드를 사용하지 마세요.

$source 는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source 단계는 하나만 사용할 수 있습니다.

← 집계 파이프라인