$merge
정의
$merge 단계는 메시지를 쓸 연결을 연결 레지스트리에서 명시합니다. 연결은 Atlas 연결이어야 합니다.
$merge
파이프라인 단계의 프로토타입 형식은 다음과 같습니다.
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
구문
$merge의 Atlas Stream Processing 버전은 Atlas Data Federation 버전과 동일한 필드를 대부분 사용합니다. 하지만 Atlas Stream Processing은 Atlas 연결로의 병합만 지원하므로 into
구문은 단순화됩니다. 자세한 내용은 Atlas Data Federation $merge
필드에 대한 설명을 참조하세요.
행동
$merge
가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $merge
단계는 하나만 사용할 수 있습니다.
샤딩된 컬렉션에 대해 $merge
를 사용할 때에는 on
필드에 특별한 요구 사항이 있습니다. 자세한 내용은 $merge 구문을 참조하세요.
다음 필드의 값으로 동적 표현식을 사용할 수 있습니다.
into.db
into.coll
이를 통해 스트림 프로세서는 메시지별로 다양한 대상 Atlas collection에 메시지를 쓸 수 있습니다.
예시
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Atlas 데이터베이스와 collection으로 정렬하려면 다음과 같은 $merge
단계를 작성할 수 있습니다.
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
이 $merge
단계는 다음과 같습니다.
VIP.subscription
이라는 Atlas collection에Very Important Industries
메시지를 씁니다.employee.requisition
이라는 Atlas collection에N. E. Buddy
메시지를 씁니다.contractor.billableHours
이라는 Atlas collection에Khan Traktor
메시지를 씁니다.
문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
동적 표현식으로 데이터베이스 또는 컬렉션을 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 데드 레터 큐가 구성된 경우 해당 메시지를 보내고 후속 메시지를 처리합니다. 데드레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.$match
단계에서는dewPoint.value
값이5.0
미만인 문서를 제외하고dewPoint.value
값이5.0
보다 큰 문서를 다음 단계로 전달합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.