문서 메뉴
문서 홈
/
MongoDB 아틀라스
/ /

$emit

이 페이지의 내용

  • 정의
  • 구문
  • Apache Kafka 브로커
  • Atlas Time Series 컬렉션
  • 행동

단계는 $emit 메시지를 내보낼 연결 레지스트리 의 연결을 지정합니다. 연결은 Apache Kafka 중 하나여야 합니다. 브로커 또는 time series 컬렉션.

처리된 데이터를 Apache Kafka 에 쓰려면 $emit 브로커를 사용하려면 다음 프로토타입 형식의 파이프라인 단계를 사용하세요.

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
필수 사항
연결 레지스트리 에 표시되는 데이터를 수집할 연결의 이름입니다.
topic
문자열 | 표현식
필수 사항
Apache Kafka 의 이름 메시지를 보낼 주제입니다.
config
문서
옵션
다양한 기본값을 재정의하는 필드가 포함된 문서입니다.
config.key
객체 | 문자열
옵션

Apache Kafka 로 평가되는 표현식 메시지 키.

config.key 을 지정하는 경우 config.keyFormat 을 지정해야 합니다.

config.keyFormat
문자열
조건부

Apache Kafka 를 역직렬화하는 데 사용되는 데이터 유형 주요 데이터. 다음 값 중 하나여야 합니다.

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

기본값은 binData 입니다. config.key 을 지정하는 경우 config.keyFormat 을 지정해야 합니다. 문서의 config.key 가 지정된 데이터 유형으로 성공적으로 역직렬화하지 못하면 Atlas Stream Processing이 해당 문서를 데드 레터 대기열로 보냅니다.

config.outputFormat
문자열
옵션

Apache Kafka 로 메시지를 내보낼 때 사용할 JSON 형식 . 다음 값 중 하나여야 합니다.

  • "relaxedJson"

  • "canonicalJson"

기본값은 "relaxedJson"입니다.

처리된 데이터를 Atlas time series 컬렉션에 쓰려면 다음 프로토타입 형식의 $emit 파이프라인 단계를 사용합니다.

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
connectionName
문자열
필수 사항
연결 레지스트리 에 표시되는 데이터를 수집할 연결의 이름입니다.
db
문자열
필수 사항
대상 time series 컬렉션이 포함된 Atlas 데이터베이스의 이름입니다.
coll
문자열
필수 사항
쓸 Atlas time series 컬렉션의 이름입니다.
timeseries
문서
필수 사항
컬렉션의 time series 필드 를 정의하는 문서입니다.

참고

time series 컬렉션 내 문서의 최대 크기는 4 MB입니다. 자세한 내용은 Time Series 컬렉션 제한 사항을 참조하세요.

$emit 가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $emit 단계는 하나만 사용할 수 있습니다.

스트림 프로세서당 하나의 Atlas time series 컬렉션에만 쓸 수 있습니다. 존재하지 않는 컬렉션을 지정하면 Atlas는 사용자가 지정한 time series 필드로 컬렉션을 생성합니다. 기존 데이터베이스를 지정해야 합니다.

동적 표현식 을 필드의 값으로 사용하여 topic 스트림 프로세서가 다른 대상 Apache Kafka 에 쓸 수 있도록 할 수 있습니다. 주제를 메시지별로 표시합니다.

예제

다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

이러한 각 항목을 고유한 Apache Kafka 로 정렬하려면 $emit 다음 단계를 작성할 수 있습니다.

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

$emit 단계는 다음과 같습니다.

  • VIP 이라는 주제에 Very Important Industries 메시지를 씁니다.

  • employee 이라는 주제에 N. E. Buddy 메시지를 씁니다.

  • contractor 이라는 주제에 Khan Traktor 메시지를 씁니다.

문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.

아직 존재하지 않는 주제를 지정하면Apache Kafka 가 주제를 대상으로 하는 첫 번째 메시지를 받으면 자동으로 주제를 생성합니다.

동적 표현식을 사용하여 주제를 지정했지만 Atlas Stream Processing이 지정된 메시지의 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 구성된 데드 레터 큐 가 없는 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.

← $tumbleWindow
$merge →