$emit
Definição
O estágio $emit
especifica uma conexão no Registro de Conexão para o qual emitir mensagens. A conexão deve ser ou um broker do Apache Kafka ou uma coleção de séries temporais.
Sintaxe
Corretora Apache Kafka
Para escrever dados processados em um Apache Kafka corretor, use o $emit
estágio de pipeline com a seguinte forma de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |||||
---|---|---|---|---|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. | |||||
| corda | expressão | Obrigatório | Nome do Apache Kafka tópico para o qual emitir mensagens. | |||||
| documento | Opcional | documento que contém campo que substituem vários valores padrão. | |||||
| int | Opcional | Número de confirmações necessárias do cluster Apache Kafka para uma O valor padrão é
| |||||
| string | Opcional | Tipo de compressão para todos os dados gerados pelo produtor. O padrão é nenhum (ou seja, nenhuma compressão). Os valores válidos são:
A compactação é usada para lotes completos de dados, portanto, a eficácia do agrupamento afeta a taxa de compactação; mais resultados de agrupamento em melhor compressão. | |||||
| expressão | Opcional | Cabeçalhos a serem adicionados à mensagem de saída. A expressão deve ser avaliada como um objeto ou uma array. Se a expressão for avaliada como um objeto, o Atlas Stream Processing constrói um cabeçalho a partir de cada par de chave-valor nesse objeto, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. Se a expressão for avaliada como uma array, ela deverá assumir a forma de uma array de objetos de pares de valores-chave. Por exemplo:
O Atlas Stream Processing constrói um cabeçalho a partir de cada objeto na array, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. O Atlas Stream Processing é compatível com valores de cabeçalho dos seguintes tipos:
| |||||
| objeto | string | Opcional | Expressão que avalia para um Apache Kafka chave da mensagem. Se você especificar | |||||
| string | Condicional | Tipo de dados usado para desserializar dados-chave do Apache Kafka. Deve ser um dos seguintes valores:
O padrão é | |||||
| string | Opcional | Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:
Padrão é |
Atlas Time Series Collection
Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit
com o seguinte formulário de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. |
| string | Obrigatório | Nome do banco de dados do Atlas que contém a coleção de séries temporais de destino. |
| string | Obrigatório | Nome da coleção de séries temporais do Atlas na qual escrever. |
| documento | Obrigatório | Documento que define os campos de série temporal para a coleção. |
Observação
O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.
Comportamento
$emit
deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit
por pipeline.
Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.
Você pode usar uma expressão dinâmica como o valor do campo topic
para permitir que o processador de fluxo grave em diferentes tópicos de destino do Apache Kafka mensagem por mensagem. A expressão deve ser avaliada como uma string.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um Apache Kafka distinto tópico, você pode escrever o seguinte $emit
estágio :
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
Este estágio $emit
:
Escreve a mensagem
Very Important Industries
para um tópico denominadoVIP
.Escreve a mensagem
N. E. Buddy
para um tópico denominadoemployee
.Escreve a mensagem
Khan Traktor
para um tópico denominadocontractor
.
Para mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um tópico que ainda não existe, oApache Kafka cria automaticamente o tópico quando recebe a primeira mensagem destinada a ele.
Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão de uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a fila de mensagens não entregues se estiver configurada e processa as mensagens subsequentes. Se não houver uma fila de mensagens não entregues configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$source
estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$match
exclui documentos que têm umairTemperature.value
maior ou igual a30.0
e passa os documentos com umairTemperature.value
menor que30.0
para o próximo estágio.O estágio
$emit
grava a saída em um tópico chamadostream
pela conexão do agenteweatherStreamOutput
Kafka.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'airTemperature.value': { '$lt': 30 } } }, { '$emit': { connectionName: 'weatherStreamOutput', topic: 'stream' } }
Os documentos no tópico stream
têm o seguinte formato:
{ "st":"x+34700+119500", "position": { "type": "Point", "coordinates": [122.8,116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1","AG1","UG1","SA1","MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight":{ "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime":{ "$date":"2024-09-26T17:34:41.843Z" }, "_stream_meta":{ "source":{ "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.