Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/ /

$emit

Nesta página

  • Definição
  • Sintaxe
  • Corretora Apache Kafka
  • Atlas Time Series Collection
  • Comportamento

O $emit estágio especifica uma conexão no Registro de conexão para a qual emitir mensagens. A conexão deve ser um Apache Kafka corretor ou uma coleção de séries temporais.

Para escrever dados processados em um Apache Kafka corretor, use o $emit estágio de pipeline com a seguinte forma de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
connectionName
string
Obrigatório
Nome, conforme aparece no Registro de conexões, da conexão da qual os dados serão ingeridos.
topic
corda | expressão
Obrigatório
Nome do Apache Kafka tópico para o qual emitir mensagens.
config
documento
Opcional
documento que contém campo que substituem vários valores padrão.
config.key
objeto | string
Opcional

Expressão que avalia para um Apache Kafka chave de mensagem.

Se você especificar config.key, deve especificar config.keyFormat.

config.keyFormat
string
Condicional

Tipo de dados usado para desserializar o Apache Kafka dados principais. Deve ser um dos seguintes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Padrão é binData. Se você especificar config.key, deverá especificar config.keyFormat. Se o config.key de um documento não for desserializado com êxito para o tipo de dados especificado, o Atlas Stream Processing o enviará para sua fila de letras mortas.

config.outputFormat
string
Opcional

Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:

  • "relaxedJson"

  • "canonicalJson"

Padrão é "relaxedJson".

Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit com o seguinte formulário de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
connectionName
string
Obrigatório
Nome, conforme aparece no Registro de conexões, da conexão da qual os dados serão ingeridos.
db
string
Obrigatório
Nome do banco de dados do Atlas que contém a coleção de séries temporais de destino.
coll
string
Obrigatório
Nome da coleção de séries temporais do Atlas na qual escrever.
timeseries
documento
Obrigatório
Documento que define os campos de série temporal para a coleção.

Observação

O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.

$emit deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit por pipeline.

Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.

Você pode usar uma expressão dinâmica como o valor do topic campo para permitir que seu processador de fluxo grave em um Apache Kafka de destino diferente tópicos mensagem a mensagem.

Exemplo

Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para classificar cada um deles em um Apache Kafka distinto tópico, você pode escrever o seguinte $emit estágio :

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

Este estágio $emit :

  • Escreve a mensagem Very Important Industries para um tópico denominado VIP.

  • Escreve a mensagem N. E. Buddy para um tópico denominado employee.

  • Escreve a mensagem Khan Traktor para um tópico denominado contractor.

Você pode usar apenas expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.

Se você especificar um tópico que ainda não existe, oApache Kafka cria automaticamente o tópico quando recebe a primeira mensagem destinada a ele.

Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a dead letter queue (DLQ) se estiver configurado e processará as mensagens subsequentes. Se não houver nenhuma fila de letras mortas configurada, o Atlas Stream Processing ignorará a mensagem completamente e processará as mensagens subsequentes.

← $tumbleWindow
$merge →