$emit
Definição
O $emit
estágio especifica uma conexão no Registro de conexão para a qual emitir mensagens. A conexão deve ser um Apache Kafka corretor ou uma coleção de séries temporais.
Sintaxe
Corretora Apache Kafka
Para escrever dados processados em um Apache Kafka corretor, use o $emit
estágio de pipeline com a seguinte forma de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
connectionName | string | Obrigatório | Nome, conforme aparece no Registro de conexões, da conexão da qual os dados serão ingeridos. |
topic | corda | expressão | Obrigatório | Nome do Apache Kafka tópico para o qual emitir mensagens. |
config | documento | Opcional | documento que contém campo que substituem vários valores padrão. |
config.key | objeto | string | Opcional | Expressão que avalia para um Apache Kafka chave de mensagem. Se você especificar |
config.keyFormat | string | Condicional | Tipo de dados usado para desserializar o Apache Kafka dados principais. Deve ser um dos seguintes valores:
Padrão é |
config.outputFormat | string | Opcional | Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:
Padrão é |
Atlas Time Series Collection
Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit
com o seguinte formulário de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
connectionName | string | Obrigatório | Nome, conforme aparece no Registro de conexões, da conexão da qual os dados serão ingeridos. |
db | string | Obrigatório | Nome do banco de dados do Atlas que contém a coleção de séries temporais de destino. |
coll | string | Obrigatório | Nome da coleção de séries temporais do Atlas na qual escrever. |
timeseries | documento | Obrigatório | Documento que define os campos de série temporal para a coleção. |
Observação
O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.
Comportamento
$emit
deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit
por pipeline.
Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.
Você pode usar uma expressão dinâmica como o valor do topic
campo para permitir que seu processador de fluxo grave em um Apache Kafka de destino diferente tópicos mensagem a mensagem.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um Apache Kafka distinto tópico, você pode escrever o seguinte $emit
estágio :
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
Este estágio $emit
:
Escreve a mensagem
Very Important Industries
para um tópico denominadoVIP
.Escreve a mensagem
N. E. Buddy
para um tópico denominadoemployee
.Escreve a mensagem
Khan Traktor
para um tópico denominadocontractor
.
Você pode usar apenas expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um tópico que ainda não existe, oApache Kafka cria automaticamente o tópico quando recebe a primeira mensagem destinada a ele.
Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a dead letter queue (DLQ) se estiver configurado e processará as mensagens subsequentes. Se não houver nenhuma fila de letras mortas configurada, o Atlas Stream Processing ignorará a mensagem completamente e processará as mensagens subsequentes.