$merge
정의
$merge 단계는 메시지를 쓸 연결 레지스트리 의 연결을 지정합니다. 연결은 Atlas 연결이어야 합니다.
$merge
파이프라인 단계의 프로토타입 형식은 다음과 같습니다.
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
구문
Atlas Stream Processing 버전의 $merge 는 Atlas Data Federation 버전과 동일한 필드 대부분을 사용합니다. 그러나 Atlas Stream Processing은 Atlas 연결로의 병합만 지원하므로 into
구문이 간소화되었습니다. 자세한 내용은 Atlas Data Federation $merge
필드에 대한 설명 을 참조하세요.
행동
$merge
가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $merge
단계는 하나만 사용할 수 있습니다.
on
필드에는 샤드 컬렉션에 대한 $merge
에 대한 특별한 요구 사항이 있습니다. 자세히 알아보려면 $merge 구문을 참조하세요.
동적 표현식 을 다음 필드의 값으로 사용할 수 있습니다.
into.db
into.coll
이를 통해 스트림 프로세서는 메시지별로 다양한 대상 Atlas collection에 메시지를 쓸 수 있습니다.
예제
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Atlas 데이터베이스와 collection으로 정렬하려면 다음과 같은 $merge
단계를 작성할 수 있습니다.
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
이 $merge
단계는 다음과 같습니다.
VIP.subscription
이라는 Atlas collection에Very Important Industries
메시지를 씁니다.employee.requisition
이라는 Atlas collection에N. E. Buddy
메시지를 씁니다.contractor.billableHours
이라는 Atlas collection에Khan Traktor
메시지를 씁니다.
문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
동적 표현식을 사용하여 데이터베이스 또는 컬렉션을 지정했지만 Atlas Stream Processing이 지정된 메시지의 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 구성된 데드 레터 큐 가 없는 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.