$source
이 페이지의 내용
정의
$source
단계에서는 데이터를 스트리밍할 연결 레지스트리의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.
Apache Kafka 브로커
MongoDB collection change stream
MongoDB database change stream
문서 배열
참고
Atlas 서버리스 인스턴스를 $source
로 사용할 수 없습니다.
구문
Apache Kafka 브로커
Apache Kafka 에서 스트리밍 데이터를 작업하려면 브로커에서 $source
단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |
---|---|---|---|---|
| 문자열 | 필수 사항 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. | |
| 문자열 또는 문자열 배열 | 필수 사항 | 메시지를 스트리밍할 하나 이상의 Apache Kafka 주제 이름입니다. 두 개 이상의 주제에서 메시지를 스트리밍하려면 배열로 지정합니다. | |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
| |
| 문자열 | 옵션 | $source에 의해 프로젝션된 타임스탬프 필드의 이름을 덮어쓰는 이름입니다. Atlas Stream Processing 파이프라인의 $source 단계는 문서의 지정된 타임스탬프와 함께 | |
| 문서 | 옵션 | 파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다. | |
| integer | 옵션 | 파티션 유휴 시간 초과 기간을 지정하는 숫자입니다. | |
| 문자열 | 옵션 | 파티션 유휴 시간 초과 기간의 시간 단위입니다.
| |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. | |
| 문자열 | 옵션 | Apache Kafka 소스 토픽에서 어느 이벤트부터 수집을 시작할지 지정합니다.
기본값은 | |
| 문자열 | 옵션 | 스트림 프로세서와 연결할 kafka 소비자 그룹의 ID입니다. 생략된 경우 Atlas Stream Processing은 다음 형식으로 스트림 처리 인스턴스를 자동 생성된 ID와 연결합니다.
Atlas Stream Processing은 Apache Kafka 에 파티션 오프셋을 커밋합니다. 체크포인트가 커밋된 후 지정된 소비자 그룹 ID에 대한 브로커. 해당 오프셋을 통한 메시지가 체크포인트에 지속적으로 기록되면 오프셋을 커밋합니다. 이를 통해 Kafka 브로커 소비자 그룹 메타데이터에서 직접 스트림 프로세서의 오프셋 지연 및 진행 상황을 추적할 수 있습니다. | |
| 문자열 | 옵션 | Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.
기본값은 | |
| 문자열 | 옵션 | Apache Kafka 키 데이터의 직렬화 시 발생하는 오류를 처리하는 방법 다음 값 중 하나여야 합니다.
|
참고
Atlas Stream Processing에서는 소스 데이터 스트림의 문서가 유효한 json
또는 ejson
이어야 합니다. Atlas Stream Processing은 데드 레터 큐를 구성한 경우 이 요구 사항을 충족하지 않는 문서를 데드 레터 큐로 설정합니다.
MongoDB collection change stream
Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source
단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 문자열 | 필수 사항 |
|
| 문자열 또는 문자열 배열 | 필수 사항 | 로 지정된 Atlas 인스턴스 에서 호스팅되는 하나 이상의 MongoDB 컬렉션 |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다. |
MongoDB database change stream
Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source
단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 문자열 | 필수 사항 |
|
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다. |
MongoDB 클러스터 전체 변경 스트림 소스
전체 Atlas 클러스터 변경 스트림에서 스트리밍 데이터를 처리하기 위해 $source
단계는 다음과 같은 프로토타입 형태를 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다. |
문서 배열
문서 배열에서 작업하기 위해 $source
단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 배열 | 조건부 | Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. |
행동
$source
는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source
단계는 하나만 사용할 수 있습니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.$match
단계에서는dewPoint.value
값이5.0
미만인 문서를 제외하고dewPoint.value
값이5.0
보다 큰 문서를 다음 단계로 전달합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.