“文档” 菜单
文档首页
/
MongoDB 阿特拉斯
/ /

$emit

在此页面上

  • 定义
  • 语法
  • Apache Kafka 代理
  • Atlas 时间序列集合
  • 行为

$emit阶段指定 连接注册表 中要向其发送消息的连接。连接必须是 Apache Kafka 代理或 时间序列集合。

将处理后的数据写入 Apache Kafka $emit代理,使用具有以下原型形式的 管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明
connectionName
字符串
必需
名称(显示在连接注册表中)从中引入数据的连接的名称。
topic
字符串 |表达式
必需
Apache Kafka 的名称 要向其发送消息的主题。
config
文档
Optional
包含可覆盖各种默认值的字段的文档。
config.key
对象 |字符串
Optional

计算结果为 Apache Kafka 的表达式 消息键。

如果指定config.key ,则必须指定config.keyFormat

config.keyFormat
字符串
可选的

用于反序列化 Apache Kafka 的数据类型 关键数据。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认为binData 。如果指定config.key ,则必须指定config.keyFormat 。如果文档的config.key未成功反序列化为指定的数据类型,Atlas Stream Processing 会将其发送到死信队列。

config.outputFormat
字符串
Optional

向 Apache Kafka 发送消息时使用的 JSON 格式 。必须是以下值之一:

  • "relaxedJson"

  • "canonicalJson"

默认值为 "relaxedJson"

要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明
connectionName
字符串
必需
名称(显示在连接注册表中)从中引入数据的连接的名称。
db
字符串
必需
包含目标时间序列集合的 Atlas 数据库的名称。
coll
字符串
必需
要写入的 Atlas 时间序列集合的名称。
timeseries
文档
必需
定义集合的时间序列字段的文档。

注意

时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。

$emit 必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit阶段。

每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。

您可以使用 动态表达式 作为topic 字段的值,以使流处理器能够写入不同的目标 Apache Kafka 逐条消息的主题。

例子

您有一个事务事件流,它会生成以下形式的消息:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

要将其中每一个分类到不同的 Apache Kafka 主题,您可以编写以下$emit 阶段:

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

$emit阶段:

  • Very Important Industries消息写入名为VIP的主题。

  • N. E. Buddy消息写入名为employee的主题。

  • Khan Traktor消息写入名为contractor的主题。

只能使用计算结果为字符串的动态表达式。 有关动态表达式的更多信息,请参阅表达式操作符。

如果您指定的主题尚不存在, Apache Kafka 当它收到第一条针对该主题的消息时,会自动创建该主题。

如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列(如果已配置)并处理后续消息。如果没有配置死信队列,Atlas Stream Processing 会完全跳过该消息并处理后续消息。

← $tumbleWindow
$merge →