Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$emit

이 페이지의 내용

  • 정의
  • 구문
  • Apache Kafka 브로커
  • Atlas Time Series 컬렉션
  • 행동
  • 예시

$emit 단계는 메시지를 전송할 연결을 연결 레지스트리에서 명시합니다. 이 연결은 Apache Kafka 브로커이거나 Time Series 컬렉션이어야 합니다.

처리된 데이터를 Apache Kafka 에 쓰려면 $emit 브로커를 사용하려면 다음 프로토타입 형식의 파이프라인 단계를 사용하세요.

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
필수 사항
데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다.
topic
문자열 | 표현식
필수 사항
Apache Kafka 의 이름 메시지를 보낼 주제입니다.
config
문서
옵션
다양한 기본값을 재정의하는 필드가 포함된 문서입니다.
config.headers
표현식
옵션

출력 메시지에 추가할 헤더입니다. 표현식은 객체 또는 배열로 평가되어야 합니다.

표현식이 객체로 평가되는 경우, Atlas Stream Processing은 해당 객체의 각 키-값 쌍에서 헤더를 구성합니다. 여기서 키는 헤더 이름이고 값은 헤더 값입니다.

표현식이 배열로 평가되는 경우 키-값 쌍 객체배열 형식을 취해야 합니다. 예시:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing은 배열의 각 객체로부터 헤더를 구성하며 이때 키는 헤더 이름이고 값은 헤더 값입니다.

Atlas Stream Processing은 다음 유형의 헤더 값을 지원합니다.

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key
객체 | string
옵션

Apache Kafka 메시지 키로 평가되는 표현식입니다.

config.key를 지정하는 경우 config.keyFormat을 지정해야 합니다.

config.keyFormat
문자열
조건부

Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

기본값은 binData입니다. config.key 를 지정하는 경우 config.keyFormat을 지정해야 합니다. 문서의 config.key가 지정된 데이터 타입으로 성공적으로 역직렬화되지 않으면 Atlas Stream Processing은 이를 데드 레터 큐로 보냅니다.

config.outputFormat
문자열
옵션

Apache Kafka로 메시지를 내보낼 때 사용할 JSON 형식입니다. 다음 값 중 하나여야 합니다.

  • "relaxedJson"

  • "canonicalJson"

기본값은 "relaxedJson"입니다.

처리된 데이터를 Atlas time series 컬렉션에 쓰려면 다음 프로토타입 형식의 $emit 파이프라인 단계를 사용합니다.

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
필수 사항
데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다.
db
문자열
필수 사항
대상 time series 컬렉션이 포함된 Atlas 데이터베이스의 이름입니다.
coll
문자열
필수 사항
쓸 Atlas time series 컬렉션의 이름입니다.
timeseries
문서
필수 사항
컬렉션의 time series 필드 를 정의하는 문서입니다.

참고

time series 컬렉션 내 문서의 최대 크기는 4 MB입니다. 자세한 내용은 Time Series 컬렉션 제한 사항을 참조하세요.

$emit 가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $emit 단계는 하나만 사용할 수 있습니다.

스트림 프로세서당 하나의 Atlas time series 컬렉션에만 쓸 수 있습니다. 존재하지 않는 컬렉션을 지정하면 Atlas는 사용자가 지정한 time series 필드로 컬렉션을 생성합니다. 기존 데이터베이스를 지정해야 합니다.

동적 표현식topic 필드의 값으로 사용하여 스트림 프로세서가 메시지별로 다른 대상 Apache Kafka 주제에 쓰도록 할 수 있습니다. 표현식은 문자열로 평가되어야 합니다.

예시

다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

이러한 각 항목을 고유한 Apache Kafka 로 정렬하려면 $emit 다음 단계를 작성할 수 있습니다.

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

$emit 단계는 다음과 같습니다.

  • VIP 이라는 주제에 Very Important Industries 메시지를 씁니다.

  • employee 이라는 주제에 N. E. Buddy 메시지를 씁니다.

  • contractor 이라는 주제에 Khan Traktor 메시지를 씁니다.

동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.

아직 존재하지 않는 주제를 지정하면Apache Kafka 가 주제를 대상으로 하는 첫 번째 메시지를 받으면 자동으로 주제를 생성합니다.

동적 표현식으로 주제를 지정했으나 Atlas Stream Processing이 주어진 메시지에 대해 해당 표현식을 평가할 수 없는 경우가 있습니다. 이 경우 설정된 경우 Atlas Stream Processing은 해당 메시지를 데드 레터 큐로 보내고 이후의 메시지를 처리합니다. 데드 레터 큐가 설정되어 있지 않은 경우 Atlas Stream Processing은 해당 메시지를 완전히 건너뛰고 이후의 메시지를 처리합니다.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여 ingestionTime으로 설정합니다.

  2. $match 단계에서는 airTemperature.value 값이 30.0 이상인 문서를 제외하고 airTemperature.value 값이 30.0 미만인 문서를 다음 단계로 전달합니다.

  3. $emit 단계에서는 weatherStreamOutput Kafka 브로커 연결을 통해 stream이라는 주제에 출력을 기록합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'airTemperature.value': { '$lt': 30 } } },
{
'$emit': {
connectionName: 'weatherStreamOutput',
topic: 'stream'
}
}

stream 주제의 문서는 다음 형식을 따릅니다.

{
"st":"x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8,116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1","AG1","UG1","SA1","MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight":{
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime":{
"$date":"2024-09-26T17:34:41.843Z"
},
"_stream_meta":{
"source":{
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

$tumbleWindow