Docs Menu

$merge

$merge 단계는 메시지를 쓸 연결을 연결 레지스트리에서 명시합니다. 연결은 Atlas 연결이어야 합니다.

$merge 파이프라인 단계의 프로토타입 형식은 다음과 같습니다.

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge",
"whenNotMatched": "insert | discard",
"parallelism": <integer>
}
}

Atlas Stream Processing 버전의 $merge는 대부분 Atlas Data Federation 버전과 동일한 필드를 사용합니다. Atlas Stream Processing은 또한 $merge의 구현에 고유하거나 이를 위해 수정된 다음 필드를 사용합니다. Atlas Data Federation $merge와 공유되는 필드에 대해 자세히 알아보려면 $merge 구문을 참조하세요.

필드
필요성
설명

into

필수 사항

Atlas Stream Processing이 Atlas 연결에만 $merge를 지원한다는 점을 고려하여 간소화되었습니다.

자세한 내용은 Atlas Data Federation $merge 필드에 대한 설명을 참조하세요.

parallelism

조건부

쓰기 작업을 분산할 스레드의 수입니다. 1에서 16 사이의 정수 값이어야 합니다. 병렬 처리 값이 높을수록 처리량이 증가합니다. 그러나 더 높은 값은 스트림 프로세서와 이를 기록하는 클러스터가 더 많은 컴퓨팅 리소스를 사용해야 합니다.

into.coll 또는 into.db에 동적 표현식 값을 사용하는 경우 이 값을 1보다 크게 설정할 수 없습니다.

$merge 가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $merge 단계는 하나만 사용할 수 있습니다.

샤딩된 컬렉션에 대해 $merge를 사용할 때에는 on 필드에 특별한 요구 사항이 있습니다. 자세한 내용은 $merge 구문을 참조하세요.

into.coll 또는 into.db에 동적 표현식 값을 사용하는 경우 parallelism 값을 1보다 크게 설정할 수 없습니다.

다음 필드의 값으로 동적 표현식을 사용할 수 있습니다.

  • into.db

  • into.coll

이를 통해 스트림 프로세서는 메시지별로 다양한 대상 Atlas collection에 메시지를 쓸 수 있습니다.

예시

다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

이러한 각 항목을 고유한 Atlas 데이터베이스와 collection으로 정렬하려면 다음과 같은 $merge 단계를 작성할 수 있습니다.

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

$merge 단계는 다음과 같습니다.

  • VIP.subscription 이라는 Atlas collection에 Very Important Industries 메시지를 씁니다.

  • employee.requisition 이라는 Atlas collection에 N. E. Buddy 메시지를 씁니다.

  • contractor.billableHours 이라는 Atlas collection에 Khan Traktor 메시지를 씁니다.

문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.

동적 표현식으로 데이터베이스 또는 컬렉션을 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 데드 레터 큐가 구성된 경우 해당 메시지를 보내고 후속 메시지를 처리합니다. 데드레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.

여러 Apache Kafka 주제의 스트리밍 데이터를 Atlas 클러스터의 컬렉션에 저장하려면 $merge 단계와 $source 단계를 사용하세요. $source 단계는 데이터를 읽을 주제를 지정합니다. $merge 단계는 데이터를 대상 컬렉션에 씁니다.

다음 구문을 사용합니다.

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여 ingestionTime으로 설정합니다.

  2. $match 단계에서는 dewPoint.value 값이 5.0 미만인 문서를 제외하고 dewPoint.value 값이 5.0 보다 큰 문서를 다음 단계로 전달합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.