Docs 菜单
Docs 主页
/
MongoDB Atlas
/ /

$emit

在此页面上

  • 定义
  • 语法
  • Apache Kafka 代理
  • Atlas 时间序列集合
  • 行为
  • 示例

$emit 阶段在 连接注册表中指定要向其发送消息的连接。该连接必须是 Apache Kafka 代理或时间序列集合。

将处理后的数据写入 Apache Kafka $emit代理,使用具有以下原型形式的 管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

从中提取数据的连接的名称,如连接注册表所示。

topic

字符串 |表达式

必需

Apache Kafka 的名称 要向其发送消息的主题。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.acks

int

Optional

成功执行 操作所需的Apache Kafka集群确认次数。$emit

默认值为 all。 Atlas Stream Processing支持以下值:

  • -1

  • 0

  • 1

  • all

config.compression_type

字符串

Optional

生产者生成的所有数据的压缩类型。默认为 none(即不压缩)。有效值为:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

压缩用于全批数据,因此批处理的效率会影响压缩率;批处理越多,压缩效果越好。

config.headers

表达式(expression)

Optional

要添加到输出消息的标题。该表达式的计算结果必须为对象或数组。

如果表达式的计算结果为对象,Atlas Stream Processing 将根据该对象中的每个键值对构造一个标头,其中键是标头名称,值是标头值。

如果表达式的计算结果为数组,则它必须采用键值对对象数组的形式。例如:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing 从数组中的每个对象构造一个标头,其中键是标头名称,值是标头值。

Atlas Stream Processing 支持以下类型的标头值:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

对象 | string

Optional

求值为 Apache Kafka 消息键的表达式。

如果您指定 config.key,则必须指定 config.keyFormat

config.keyFormat

字符串

可选的

用于反序列化 Apache Kafka key 数据的数据类型。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认为 binData。如果您指定 config.key,则必须指定 config.keyFormat。如果文档的 config.key 未成功反序列化为指定的数据类型,Atlas Stream Processing 会将其发送到死信队列。

config.outputFormat

字符串

Optional

Apache Kafka 发送消息时使用的 JSON 格式。必须是下列值之一:

  • "relaxedJson"

  • "canonicalJson"

默认值为 "relaxedJson"

要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

从中提取数据的连接的名称,如连接注册表所示。

db

字符串

必需

包含目标时间序列集合的 Atlas 数据库的名称。

coll

字符串

必需

要写入的 Atlas 时间序列集合的名称。

timeseries

文档

必需

定义集合的时间序列字段的文档。

注意

时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。

$emit 必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit阶段。

每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。

您可以使用 动态表达式 作为 topic 字段的值,以使您的流处理器能够逐条消息写入不同的目标 Apache Kafka 主题。该表达式的结果必须为字符串。

例子

您有一个事务事件流,它会生成以下形式的消息:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

要将其中每一个分类到不同的 Apache Kafka 主题,您可以编写以下$emit 阶段:

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

$emit阶段:

  • Very Important Industries消息写入名为VIP的主题。

  • N. E. Buddy消息写入名为employee的主题。

  • Khan Traktor消息写入名为contractor的主题。

有关动态表达式的更多信息,请参阅表达式操作符。

如果您指定的主题尚不存在, Apache Kafka 当它收到第一条针对该主题的消息时,会自动创建该主题。

如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列(如果已配置)并处理后续消息。如果未配置死信队列,则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与在名为 my_weatherdata 的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖它投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $match 阶段排除 airTemperature.value 大于或等于 30.0 的文档,并将 airTemperature.value 小于 30.0 的文档传递到下一阶段。

  3. $emit 阶段会通过 weatherStreamOutput Kafka 代理连接将输出写入名为 stream 的主题。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'airTemperature.value': { '$lt': 30 } } },
{
'$emit': {
connectionName: 'weatherStreamOutput',
topic: 'stream'
}
}

stream 主题中的文档采用以下形式:

{
"st":"x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8,116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1","AG1","UG1","SA1","MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight":{
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime":{
"$date":"2024-09-26T17:34:41.843Z"
},
"_stream_meta":{
"source":{
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$tumbleWindow