$emit
정의
$emit
단계는 메시지를 전송할 연결을 연결 레지스트리에서 명시합니다. 이 연결은 Apache Kafka 브로커이거나 Time Series 컬렉션이어야 합니다.
구문
Apache Kafka 브로커
처리된 데이터를 Apache Kafka 에 쓰려면 $emit
브로커를 사용하려면 다음 프로토타입 형식의 파이프라인 단계를 사용하세요.
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |||||
---|---|---|---|---|---|---|---|---|
connectionName | 문자열 | 필수 사항 | 데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다. | |||||
topic | 문자열 | 표현식 | 필수 사항 | Apache Kafka 의 이름 메시지를 보낼 주제입니다. | |||||
config | 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. | |||||
config.headers | 표현식 | 옵션 | 출력 메시지에 추가할 헤더입니다. 표현식은 객체 또는 배열로 평가되어야 합니다. 표현식이 객체로 평가되는 경우, Atlas Stream Processing은 해당 객체의 각 키-값 쌍에서 헤더를 구성합니다. 여기서 키는 헤더 이름이고 값은 헤더 값입니다. 표현식이 배열로 평가되는 경우 키-값 쌍 객체배열 형식을 취해야 합니다. 예시:
Atlas Stream Processing은 배열의 각 객체로부터 헤더를 구성하며 이때 키는 헤더 이름이고 값은 헤더 값입니다. Atlas Stream Processing은 다음 유형의 헤더 값을 지원합니다.
| |||||
config.key | 객체 | string | 옵션 | Apache Kafka 메시지 키로 평가되는 표현식입니다.
| |||||
config.keyFormat | 문자열 | 조건부 | Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.
기본값은 | |||||
config.outputFormat | 문자열 | 옵션 | Apache Kafka로 메시지를 내보낼 때 사용할 JSON 형식입니다. 다음 값 중 하나여야 합니다.
기본값은 |
Atlas Time Series 컬렉션
처리된 데이터를 Atlas time series 컬렉션에 쓰려면 다음 프로토타입 형식의 $emit
파이프라인 단계를 사용합니다.
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
connectionName | 문자열 | 필수 사항 | 데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다. |
db | 문자열 | 필수 사항 | 대상 time series 컬렉션이 포함된 Atlas 데이터베이스의 이름입니다. |
coll | 문자열 | 필수 사항 | 쓸 Atlas time series 컬렉션의 이름입니다. |
timeseries | 문서 | 필수 사항 | 컬렉션의 time series 필드 를 정의하는 문서입니다. |
참고
time series 컬렉션 내 문서의 최대 크기는 4 MB입니다. 자세한 내용은 Time Series 컬렉션 제한 사항을 참조하세요.
행동
$emit
가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $emit
단계는 하나만 사용할 수 있습니다.
스트림 프로세서당 하나의 Atlas time series 컬렉션에만 쓸 수 있습니다. 존재하지 않는 컬렉션을 지정하면 Atlas는 사용자가 지정한 time series 필드로 컬렉션을 생성합니다. 기존 데이터베이스를 지정해야 합니다.
동적 표현식을 topic
필드의 값으로 사용하여 스트림 프로세서가 메시지별로 다른 대상 Apache Kafka 주제에 쓰도록 할 수 있습니다. 표현식은 문자열로 평가되어야 합니다.
예시
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Apache Kafka 로 정렬하려면 $emit
다음 단계를 작성할 수 있습니다.
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
이 $emit
단계는 다음과 같습니다.
VIP
이라는 주제에Very Important Industries
메시지를 씁니다.employee
이라는 주제에N. E. Buddy
메시지를 씁니다.contractor
이라는 주제에Khan Traktor
메시지를 씁니다.
동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
아직 존재하지 않는 주제를 지정하면Apache Kafka 가 주제를 대상으로 하는 첫 번째 메시지를 받으면 자동으로 주제를 생성합니다.
동적 표현식으로 주제를 지정했으나 Atlas Stream Processing이 주어진 메시지에 대해 해당 표현식을 평가할 수 없는 경우가 있습니다. 이 경우 설정된 경우 Atlas Stream Processing은 해당 메시지를 데드 레터 큐로 보내고 이후의 메시지를 처리합니다. 데드 레터 큐가 설정되어 있지 않은 경우 Atlas Stream Processing은 해당 메시지를 완전히 건너뛰고 이후의 메시지를 처리합니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.$match
단계에서는airTemperature.value
값이30.0
이상인 문서를 제외하고airTemperature.value
값이30.0
미만인 문서를 다음 단계로 전달합니다.$emit
단계에서는weatherStreamOutput
Kafka 브로커 연결을 통해stream
이라는 주제에 출력을 기록합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'airTemperature.value': { '$lt': 30 } } }, { '$emit': { connectionName: 'weatherStreamOutput', topic: 'stream' } }
stream
주제의 문서는 다음 형식을 따릅니다.
{ "st":"x+34700+119500", "position": { "type": "Point", "coordinates": [122.8,116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1","AG1","UG1","SA1","MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight":{ "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime":{ "$date":"2024-09-26T17:34:41.843Z" }, "_stream_meta":{ "source":{ "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.