Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/ /

$emit

Nesta página

  • Definição
  • Sintaxe
  • Corretora Apache Kafka
  • Atlas Time Series Collection
  • Comportamento
  • Exemplos

O estágio $emit especifica uma conexão no Registro de Conexão para o qual emitir mensagens. A conexão deve ser ou um broker do Apache Kafka ou uma coleção de séries temporais.

Para escrever dados processados em um Apache Kafka corretor, use o $emit estágio de pipeline com a seguinte forma de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados.

topic

corda | expressão

Obrigatório

Nome do Apache Kafka tópico para o qual emitir mensagens.

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.acks

int

Opcional

Número de confirmações necessárias do cluster Apache Kafka para uma $emit operação bem-sucedida.

O valor padrão é all. O Atlas Stream Processing suporta os seguintes valores:

  • -1

  • 0

  • 1

  • all

config.compression_type

string

Opcional

Tipo de compressão para todos os dados gerados pelo produtor. O padrão é nenhum (ou seja, nenhuma compressão). Os valores válidos são:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

A compactação é usada para lotes completos de dados, portanto, a eficácia do agrupamento afeta a taxa de compactação; mais resultados de agrupamento em melhor compressão.

config.headers

expressão

Opcional

Cabeçalhos a serem adicionados à mensagem de saída. A expressão deve ser avaliada como um objeto ou uma array.

Se a expressão for avaliada como um objeto, o Atlas Stream Processing constrói um cabeçalho a partir de cada par de chave-valor nesse objeto, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho.

Se a expressão for avaliada como uma array, ela deverá assumir a forma de uma array de objetos de pares de valores-chave. Por exemplo:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

O Atlas Stream Processing constrói um cabeçalho a partir de cada objeto na array, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho.

O Atlas Stream Processing é compatível com valores de cabeçalho dos seguintes tipos:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

objeto | string

Opcional

Expressão que avalia para um Apache Kafka chave da mensagem.

Se você especificar config.key, deverá especificar config.keyFormat.

config.keyFormat

string

Condicional

Tipo de dados usado para desserializar dados-chave do Apache Kafka. Deve ser um dos seguintes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

O padrão é binData. Se você especificar config.key, você deve especificar config.keyFormat. Se o config.key de um documento não for desserializado com êxito para o tipo de dados especificado, o Atlas Stream Processing o enviará para sua fila de mensagens não entregues (DLQ).

config.outputFormat

string

Opcional

Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:

  • "relaxedJson"

  • "canonicalJson"

Padrão é "relaxedJson".

Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit com o seguinte formulário de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados.

db

string

Obrigatório

Nome do banco de dados do Atlas que contém a coleção de séries temporais de destino.

coll

string

Obrigatório

Nome da coleção de séries temporais do Atlas na qual escrever.

timeseries

documento

Obrigatório

Documento que define os campos de série temporal para a coleção.

Observação

O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.

$emit deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit por pipeline.

Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.

Você pode usar uma expressão dinâmica como o valor do campo topic para permitir que o processador de fluxo grave em diferentes tópicos de destino do Apache Kafka mensagem por mensagem. A expressão deve ser avaliada como uma string.

Exemplo

Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para classificar cada um deles em um Apache Kafka distinto tópico, você pode escrever o seguinte $emit estágio :

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

Este estágio $emit :

  • Escreve a mensagem Very Important Industries para um tópico denominado VIP.

  • Escreve a mensagem N. E. Buddy para um tópico denominado employee.

  • Escreve a mensagem Khan Traktor para um tópico denominado contractor.

Para mais informações sobre expressões dinâmicas, consulte operadores de expressão.

Se você especificar um tópico que ainda não existe, oApache Kafka cria automaticamente o tópico quando recebe a primeira mensagem destinada a ele.

Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão de uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a fila de mensagens não entregues se estiver configurada e processa as mensagens subsequentes. Se não houver uma fila de mensagens não entregues configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $match exclui documentos que têm um airTemperature.value maior ou igual a 30.0 e passa os documentos com um airTemperature.value menor que 30.0 para o próximo estágio.

  3. O estágio $emit grava a saída em um tópico chamado stream pela conexão do agente weatherStreamOutput Kafka.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'airTemperature.value': { '$lt': 30 } } },
{
'$emit': {
connectionName: 'weatherStreamOutput',
topic: 'stream'
}
}

Os documentos no tópico stream têm o seguinte formato:

{
"st":"x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8,116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1","AG1","UG1","SA1","MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight":{
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime":{
"$date":"2024-09-26T17:34:41.843Z"
},
"_stream_meta":{
"source":{
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$tumbleWindow