$source
이 페이지의 내용
정의
$source
단계에서는 데이터를 스트리밍할 연결 레지스트리의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.
Apache Kafka 브로커
MongoDB collection change stream
MongoDB database change stream
문서 배열
참고
Atlas 서버리스 인스턴스를 $source
로 사용할 수 없습니다.
구문
Apache Kafka 브로커
Apache Kafka 에서 스트리밍 데이터를 작업하려면 브로커에서 $source
단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |
---|---|---|---|---|
| 문자열 | 필수 사항 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. | |
| 문자열 또는 문자열 배열 | 필수 사항 | 메시지를 스트리밍할 하나 이상의 Apache Kafka 주제 이름입니다. 두 개 이상의 주제에서 메시지를 스트리밍하려면 배열로 지정합니다. | |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
| |
| 문자열 | 옵션 | $source에 의해 프로젝션된 타임스탬프 필드의 이름을 덮어쓰는 이름입니다. Atlas Stream Processing 파이프라인의 $source 단계는 문서의 지정된 타임스탬프와 함께 | |
| 문서 | 옵션 | 파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다. 이 필드는 기본값으로 비활성화되어 있습니다. 유휴 상태로 인해 진행되지 않는 파티션을 처리하려면 이 field에 값을 설정하십시오. | |
| integer | 옵션 | 파티션 유휴 시간 초과 기간을 지정하는 숫자입니다. | |
| 문자열 | 옵션 | 파티션 유휴 시간 초과 기간의 시간 단위입니다.
| |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. | |
| 문자열 | 옵션 | Apache Kafka 소스 토픽에서 어느 이벤트부터 수집을 시작할지 지정합니다.
기본값은 | |
| 문자열 | 옵션 | 스트림 프로세서와 연결할 kafka 소비자 그룹의 ID입니다. 생략된 경우 Atlas Stream Processing은 다음 형식으로 스트림 처리 인스턴스를 자동 생성된 ID와 연결합니다.
Atlas Stream Processing은 Apache Kafka 에 파티션 오프셋을 커밋합니다. 체크포인트가 커밋된 후 지정된 소비자 그룹 ID에 대한 브로커. 해당 오프셋을 통한 메시지가 체크포인트에 지속적으로 기록되면 오프셋을 커밋합니다. 이를 통해 Kafka 브로커 소비자 그룹 메타데이터에서 직접 스트림 프로세서의 오프셋 지연 및 진행 상황을 추적할 수 있습니다. | |
| 문자열 | 옵션 | Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.
기본값은 | |
| 문자열 | 옵션 | Apache Kafka 키 데이터의 직렬화 시 발생하는 오류를 처리하는 방법 다음 값 중 하나여야 합니다.
|
참고
Atlas Stream Processing에서는 소스 데이터 스트림의 문서가 유효한 json
또는 ejson
이어야 합니다. Atlas Stream Processing은 데드 레터 큐를 구성한 경우 이 요구 사항을 충족하지 않는 문서를 데드 레터 큐로 설정합니다.
MongoDB collection change stream
Atlas 컬렉션 변경 스트림은 애플리케이션이 단일 컬렉션의 실시간 데이터 변경 사항에 접근할 수 있도록 합니다. 컬렉션에 대한 Change Streams을 여는 방법을 학습하려면 Change Streams을 참조하십시오.
Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source
단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 문자열 | 필수 사항 |
|
| 문자열 또는 문자열 배열 | 필수 사항 |
|
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 pipeline은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다. |
MongoDB database change stream
Atlas 데이터베이스 변경 스트림은 애플리케이션이 단일 데이터베이스에서 실시간 데이터 변경 사항에 접근할 수 있도록 합니다. 데이터베이스에 대한 Change Streams을 여는 방법을 학습하려면 Change Streams을 참조하세요.
Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source
단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 문자열 | 필수 사항 |
|
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 pipeline은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다. |
MongoDB 클러스터 전체 변경 스트림 소스
전체 Atlas 클러스터 변경 스트림에서 스트리밍 데이터를 처리하기 위해 $source
단계는 다음과 같은 프로토타입 형태를 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
|
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 change-stream-modify-output에 설명된 매개변수를 준수해야 합니다. |
문서 배열
문서 배열에서 작업하기 위해 $source
단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
$source
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 옵션 | 소스에서 선언한 기본 타임스탬프 필드의 이름을 재정의하는 이름입니다. Atlas Stream Processing 파이프라인은 내부적으로 |
| 배열 | 조건부 | Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. |
행동
$source
는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source
단계는 하나만 사용할 수 있습니다.
예시
Kafka 예제
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.$match
단계에서는dewPoint.value
값이5.0
미만인 문서를 제외하고dewPoint.value
값이5.0
보다 큰 문서를 다음 단계로 전달합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.
변경 스트림 예시
다음 aggregation은 cluster0-collection
소스에서 데이터를 수집하며, 이는 샘플 데이터셋이 로드된 Atlas cluster에 연결됩니다. 스트림 처리 Atlas Stream Processing 인스턴스를 생성하고 연결 레지스트리에 Atlas 클러스터에 대한 연결을 추가하는 방법을 학습하려면 Atlas Stream Processing 시작하기를 참조하세요. 이 집계는 두 단계를 실행하여 변경 스트림을 열고 sample_weatherdata
데이터베이스의 data
컬렉션에서 변경 사항을 기록합니다.
$source
단계는cluster0-collection
소스에 연결하고sample_mflix
데이터베이스의movies
컬렉션에 대한 변경 스트림을 엽니다. 변경 스트림 출력은 삽입 및 삭제 변경 이벤트만 기록되도록 필터링됩니다.$merge
단계는 필터링된 변경 스트림 문서를sample_mflix
데이터베이스의movies_changes
라는 Atlas 컬렉션에 기록합니다. 해당 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ $source: { connectionName: "cluster0-connection", db : "sample_weatherdata", coll : "data" }, $merge: { into: { connectionName: "cluster0-connection", db: "sample_weatherdata", coll: "data_changes" } } }
다음 mongosh
명령은 data
문서를 삭제합니다:
db.getSiblingDB("sample_weatherdata").data.deleteOne( { _id: ObjectId("5553a99ae4b02cf715120e4b") } )
data
문서가 삭제된 후, 스트림 프로세서는 변경 스트림 이벤트 문서를 sample_weatherdata.data_changes
컬렉션에 기록합니다. 결과 sample_weatherdata.data_changes
컬렉션에 있는 문서를 보려면 mongosh
를 사용하여 Atlas 클러스터에 연결한 후 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherdata").data_changes.find()
[ { _id: { _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004' }, _stream_meta: { source: { type: 'atlas' } }, _ts: ISODate('2025-02-05T21:26:59.313Z'), clusterTime: Timestamp({ t: 1738790819, i: 1 }), documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') }, ns: { db: 'sample_weatherdata', coll: 'data' }, operationType: 'delete', wallTime: ISODate('2025-02-05T21:26:59.313Z') } ]