$source
정의
$source
단계는 데이터를 스트리밍할 연결 레지스트리 의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.
Apache Kafka 브로커
MongoDB collection change stream
MongoDB database change stream
문서 배열
참고
Atlas 서버리스 인스턴스를 $source
로 사용할 수 없습니다.
구문
Apache Kafka 브로커
Apache Kafka 에서 스트리밍 데이터를 작업하려면 브로커에서 $source
단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "connectionName": "<registered-connection>", "topic" : "<source-topic>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |
---|---|---|---|---|
connectionName | 문자열 | 필수 사항 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. | |
topic | 문자열 | 필수 사항 | Apache Kafka 의 이름 메시지를 스트리밍할 주제입니다. | |
timeField | 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
| |
tsFieldName | 문자열 | 옵션 | $source에 의해 프로젝션된 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인의 $source 단계는 문서에 할당된 타임스탬프를 사용하여 | |
partitionIdleTimeout | 문서 | 옵션 | 파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다. | |
partitionIdleTimeout.size | integer | 옵션 | 파티션 유휴 시간 초과 기간을 지정하는 숫자입니다. | |
partitionIdleTimeout.unit | 문자열 | 옵션 | 파티션 유휴 시간 초과 기간의 시간 단위입니다.
| |
config | 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. | |
config.auto_offset_reset | 문자열 | 옵션 | Apache Kafka 에서 어떤 이벤트를 수집을 시작할 소스 주제입니다.
기본값은 | |
config.group_id | 문자열 | 옵션 | 스트림 프로세서와 연결할 kafka 소비자 그룹의 ID입니다. 생략된 경우 Atlas Stream Processing은 다음 형식으로 스트림 처리 인스턴스를 자동 생성된 ID와 연결합니다.
Atlas Stream Processing은 Apache Kafka 에 파티션 오프셋을 커밋합니다. 체크포인트가 커밋된 후 지정된 소비자 그룹 ID에 대한 브로커. 해당 오프셋을 통한 메시지가 체크포인트에 지속적으로 기록되면 오프셋을 커밋합니다. 이를 통해 Kafka 브로커 소비자 그룹 메타데이터에서 직접 스트림 프로세서의 오프셋 지연 및 진행 상황을 추적할 수 있습니다. | |
config.keyFormat | 문자열 | 옵션 | Apache Kafka 를 역직렬화하는 데 사용되는 데이터 유형 주요 데이터. 다음 값 중 하나여야 합니다.
기본값은 | |
config.keyFormatError | 문자열 | 옵션 | Apache Kafka 를 역직렬화할 때 발생하는 오류를 처리하는 방법 주요 데이터. 다음 값 중 하나여야 합니다.
|
참고
Atlas Stream Processing을 사용하려면 소스 데이터 스트림의 문서가 유효한 json
또는 ejson
이어야 합니다. Atlas Stream Processing은 이 요구 사항을 충족하지 않는 문서를 구성한 경우 데드 레터 대기열 로 설정합니다.
MongoDB collection change stream
Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source
단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
connectionName | 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
timeField | 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
tsFieldName | 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 체크포인트 정보를 저장하기 위해 수신 메시지에 |
db | 문자열 | 필수 사항 | connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다. |
coll | 문자열 또는 문자열 배열 | 필수 사항 | connectionName 로 지정된 Atlas 인스턴스에서 호스팅되는 하나 이상의 MongoDB 컬렉션 이름입니다. 이러한 컬렉션의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다. 이 필드를 생략하면 스트림 프로세서는 MongoDB 데이터베이스 변경 스트림에서 소싱합니다. |
config | 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
config.startAfter | 토큰 | 조건부 | 소스가 보고를 시작하는 변경 이벤트입니다. 이는 재개 토큰 형식을 취합니다.
|
config.startAtOperationTime | timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
config.fullDocument | 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 컬렉션 변경 스트림과 함께 이 필드를 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
config.fullDocumentOnly | 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 컬렉션 변경 스트림과 함께 이 필드를 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
config.fullDocumentBeforeChange | 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
컬렉션 변경 스트림과 함께 이 필드를 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
config.pipeline | 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하는 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다. |
MongoDB database change stream
Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source
단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
connectionName | 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
timeField | 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
tsFieldName | 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 체크포인트 정보를 저장하기 위해 수신 메시지에 |
db | 문자열 | 필수 사항 | connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다. |
config | 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
config.startAfter | 토큰 | 조건부 | 소스가 보고를 시작하는 변경 이벤트입니다. 이는 재개 토큰 형식을 취합니다.
|
config.startAtOperationTime | timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
config.fullDocument | 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
config.fullDocumentOnly | 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
config.fullDocumentBeforeChange | 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
config.pipeline | 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하는 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다. |
문서 배열
문서 배열에서 작업하기 위해 $source
단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
timeField | 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
tsFieldName | 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 체크포인트 정보를 저장하기 위해 수신 메시지에 |
documents | 배열 | 조건부 | Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. connectionName 필드를 사용할 때는 이 필드를 사용하지 마세요. |
행동
$source
는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source
단계는 하나만 사용할 수 있습니다.