$emit
定义
$emit
阶段在 连接注册表中指定要向其发送消息的连接。该连接必须是 Apache Kafka 代理或时间序列集合。
语法
Apache Kafka 代理
将处理后的数据写入 Apache Kafka $emit
代理,使用具有以下原型形式的 管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
$emit
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |||||
---|---|---|---|---|---|---|---|---|
connectionName | 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 | |||||
topic | 字符串 |表达式 | 必需 | Apache Kafka 的名称 要向其发送消息的主题。 | |||||
config | 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 | |||||
config.headers | 表达式(expression) | Optional | 要添加到输出消息的标题。该表达式的计算结果必须为对象或数组。 如果表达式的计算结果为对象,Atlas Stream Processing 将根据该对象中的每个键值对构造一个标头,其中键是标头名称,值是标头值。 如果表达式的计算结果为数组,则它必须采用键值对对象数组的形式。例如:
Atlas Stream Processing 从数组中的每个对象构造一个标头,其中键是标头名称,值是标头值。 Atlas Stream Processing 支持以下类型的标头值:
| |||||
config.key | 对象 | string | Optional | 求值为 Apache Kafka 消息键的表达式。 如果您指定 | |||||
config.keyFormat | 字符串 | 可选的 | 用于反序列化 Apache Kafka key 数据的数据类型。必须是以下值之一:
默认为 | |||||
config.outputFormat | 字符串 | Optional |
Atlas 时间序列集合
要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit
管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
$emit
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
connectionName | 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 |
db | 字符串 | 必需 | 包含目标时间序列集合的 Atlas 数据库的名称。 |
coll | 字符串 | 必需 | 要写入的 Atlas 时间序列集合的名称。 |
timeseries | 文档 | 必需 | 定义集合的时间序列字段的文档。 |
注意
时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。
行为
$emit
必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit
阶段。
每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。
您可以使用 动态表达式 作为 topic
字段的值,以使您的流处理器能够逐条消息写入不同的目标 Apache Kafka 主题。该表达式的结果必须为字符串。
例子
您有一个事务事件流,它会生成以下形式的消息:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
要将其中每一个分类到不同的 Apache Kafka 主题,您可以编写以下$emit
阶段:
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
此$emit
阶段:
将
Very Important Industries
消息写入名为VIP
的主题。将
N. E. Buddy
消息写入名为employee
的主题。将
Khan Traktor
消息写入名为contractor
的主题。
有关动态表达式的更多信息,请参阅表达式操作符。
如果您指定的主题尚不存在, Apache Kafka 当它收到第一条针对该主题的消息时,会自动创建该主题。
如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列(如果已配置)并处理后续消息。如果未配置死信队列,则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source
阶段与在名为my_weatherdata
的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖它投影的时间戳字段的名称,将其设置为ingestionTime
。$match
阶段排除airTemperature.value
大于或等于30.0
的文档,并将airTemperature.value
小于30.0
的文档传递到下一阶段。$emit
阶段会通过weatherStreamOutput
Kafka 代理连接将输出写入名为stream
的主题。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'airTemperature.value': { '$lt': 30 } } }, { '$emit': { connectionName: 'weatherStreamOutput', topic: 'stream' } }
stream
主题中的文档采用以下形式:
{ "st":"x+34700+119500", "position": { "type": "Point", "coordinates": [122.8,116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1","AG1","UG1","SA1","MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight":{ "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime":{ "$date":"2024-09-26T17:34:41.843Z" }, "_stream_meta":{ "source":{ "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。