$emit
定义
$emit
阶段指定 连接注册表 中要向其发送消息的连接。连接必须是 Apache Kafka 代理或 时间序列集合。
语法
Apache Kafka 代理
将处理后的数据写入 Apache Kafka $emit
代理,使用具有以下原型形式的 管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
$emit
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
connectionName | 字符串 | 必需 | 名称(显示在连接注册表中)从中引入数据的连接的名称。 |
topic | 字符串 |表达式 | 必需 | Apache Kafka 的名称 要向其发送消息的主题。 |
config | 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 |
config.key | 对象 |字符串 | Optional | 计算结果为 Apache Kafka 的表达式 消息键。 如果指定 |
config.keyFormat | 字符串 | 可选的 | 用于反序列化 Apache Kafka 的数据类型 关键数据。必须是以下值之一:
默认为 |
config.outputFormat | 字符串 | Optional | 向 Apache Kafka 发送消息时使用的 JSON 格式 。必须是以下值之一:
默认值为 |
Atlas 时间序列集合
要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit
管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
$emit
阶段采用包含以下字段的文档:
注意
时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。
行为
$emit
必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit
阶段。
每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。
您可以使用 动态表达式 作为topic
字段的值,以使流处理器能够写入不同的目标 Apache Kafka 逐条消息的主题。
例子
您有一个事务事件流,它会生成以下形式的消息:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
要将其中每一个分类到不同的 Apache Kafka 主题,您可以编写以下$emit
阶段:
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
此$emit
阶段:
将
Very Important Industries
消息写入名为VIP
的主题。将
N. E. Buddy
消息写入名为employee
的主题。将
Khan Traktor
消息写入名为contractor
的主题。
只能使用计算结果为字符串的动态表达式。 有关动态表达式的更多信息,请参阅表达式操作符。
如果您指定的主题尚不存在, Apache Kafka 当它收到第一条针对该主题的消息时,会自动创建该主题。
如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列(如果已配置)并处理后续消息。如果没有配置死信队列,Atlas Stream Processing 会完全跳过该消息并处理后续消息。