$emit
정의
단계는 $emit
메시지를 내보낼 연결 레지스트리 의 연결을 지정합니다. 연결은 Apache Kafka 중 하나여야 합니다. 브로커 또는 time series 컬렉션.
구문
Apache Kafka 브로커
처리된 데이터를 Apache Kafka 에 쓰려면 $emit
브로커를 사용하려면 다음 프로토타입 형식의 파이프라인 단계를 사용하세요.
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
connectionName | 문자열 | 필수 사항 | 연결 레지스트리 에 표시되는 데이터를 수집할 연결의 이름입니다. |
topic | 문자열 | 표현식 | 필수 사항 | Apache Kafka 의 이름 메시지를 보낼 주제입니다. |
config | 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
config.key | 객체 | 문자열 | 옵션 | Apache Kafka 로 평가되는 표현식 메시지 키.
|
config.keyFormat | 문자열 | 조건부 | Apache Kafka 를 역직렬화하는 데 사용되는 데이터 유형 주요 데이터. 다음 값 중 하나여야 합니다.
기본값은 |
config.outputFormat | 문자열 | 옵션 | Apache Kafka 로 메시지를 내보낼 때 사용할 JSON 형식 . 다음 값 중 하나여야 합니다.
기본값은 |
Atlas Time Series 컬렉션
처리된 데이터를 Atlas time series 컬렉션에 쓰려면 다음 프로토타입 형식의 $emit
파이프라인 단계를 사용합니다.
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
connectionName | 문자열 | 필수 사항 | 연결 레지스트리 에 표시되는 데이터를 수집할 연결의 이름입니다. |
db | 문자열 | 필수 사항 | 대상 time series 컬렉션이 포함된 Atlas 데이터베이스의 이름입니다. |
coll | 문자열 | 필수 사항 | 쓸 Atlas time series 컬렉션의 이름입니다. |
timeseries | 문서 | 필수 사항 | 컬렉션의 time series 필드 를 정의하는 문서입니다. |
참고
time series 컬렉션 내 문서의 최대 크기는 4 MB입니다. 자세한 내용은 Time Series 컬렉션 제한 사항을 참조하세요.
행동
$emit
가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $emit
단계는 하나만 사용할 수 있습니다.
스트림 프로세서당 하나의 Atlas time series 컬렉션에만 쓸 수 있습니다. 존재하지 않는 컬렉션을 지정하면 Atlas는 사용자가 지정한 time series 필드로 컬렉션을 생성합니다. 기존 데이터베이스를 지정해야 합니다.
동적 표현식 을 필드의 값으로 사용하여 topic
스트림 프로세서가 다른 대상 Apache Kafka 에 쓸 수 있도록 할 수 있습니다. 주제를 메시지별로 표시합니다.
예제
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Apache Kafka 로 정렬하려면 $emit
다음 단계를 작성할 수 있습니다.
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
이 $emit
단계는 다음과 같습니다.
VIP
이라는 주제에Very Important Industries
메시지를 씁니다.employee
이라는 주제에N. E. Buddy
메시지를 씁니다.contractor
이라는 주제에Khan Traktor
메시지를 씁니다.
문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
아직 존재하지 않는 주제를 지정하면Apache Kafka 가 주제를 대상으로 하는 첫 번째 메시지를 받으면 자동으로 주제를 생성합니다.
동적 표현식을 사용하여 주제를 지정했지만 Atlas Stream Processing이 지정된 메시지의 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 구성된 데드 레터 큐 가 없는 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.