Docs 菜单

托管流处理器

Atlas Stream Processing 流处理器将唯一命名的流媒体聚合管道的逻辑应用于流媒体数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。 您只能在存储其定义的Atlas Stream Processing实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程最多 4 个流处理器。 对于超过此限制的其他处理器,Atlas Stream Processing 会分配新资源。

要创建和管理流处理器,您必须具备:

许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。

您可以使用 sp.process() 方法交互式创建流处理器。您以交互方式创建的流处理器会表现出以下行为:

  • 将输出和死信队列文档写入shell

  • 创建后立即开始运行

  • 运行 10 分钟或直到用户停止它们

  • 停止后不要继续

您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。

sp.process()的语法如下:

sp.process(<pipeline>)
字段
类型
必要性
说明

pipeline

阵列

必需

要应用于流媒体数据的管道

1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 x.059 身份验证,以名为 streamOwner 的用户身份连接到流处理实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

以下命令创建一个流处理器,该处理器应用pipeline中定义的逻辑。

sp.process(pipeline)

要创建流处理器:

Atlas Administration API 提供了一个用于创建流处理器的端点。

创建一个流处理器

要使用mongosh创建新的流处理器,请使用sp.createStreamProcessor()方法。 它具有以下语法:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
类型
必要性
说明

name

字符串

必需

流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。

pipeline

阵列

必需

要应用于流媒体数据的管道

options

对象

Optional

为流处理器定义各种可选设置的对象。

options.dlq

对象

可选的

为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义options字段,则此字段是必需的。

options.dlq.connectionName

字符串

可选的

人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.db

字符串

可选的

options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.coll

字符串

可选的

options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。

1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 x.059 身份验证,以名为 streamOwner 的用户身份连接到流处理实例。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh提示符中,分配一个包含以下 DLQ 属性的对象:

  • connectionName

  • 数据库名称

  • 集合名称

以下示例通过metadata.dlq collection 中的cluster01连接定义了一个 DLQ。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

以下命令创建一个名为proc01的流处理器,它应用pipeline中定义的逻辑。 在处理中引发错误的文档将写入deadLetter中定义的 DLQ。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

注意

Atlas Stream Processing会丢弃处于 stopped 状态且持续 45 天或更长时间的流处理器的内部状态。 当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要启动流处理器:

Atlas Administration API 提供了一个用于启动流处理器的端点。

启动一个流处理器

要使用 mongosh 启动现有流处理器,请使用 sp.<streamprocessor>.start() 方法。<streamprocessor> 必须是为当前流处理实例定义的流处理器的名称。

例如,要启动名为proc01的流处理器,请运行以下命令:

sp.proc01.start()

此方法返回:

  • true 如果流处理器存在且当前未运行。

  • false 如果您尝试启动不存在的的流处理器,或存在且当前正在运行的流处理器。

注意

Atlas Stream Processing会丢弃处于 stopped 状态且持续 45 天或更长时间的流处理器的内部状态。 当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要停止流处理器:

Atlas Administration API 提供了一个用于停止流处理器的端点。

停止一个流处理器

要使用 mongosh 停止现有的流处理器,请使用 sp.<streamprocessor>.stop() 方法。<streamprocessor> 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。

例如,要停止名为proc01的流处理器,请运行以下命令:

sp.proc01.stop()

此方法返回:

  • true 如果流处理器存在且当前正在运行。

  • false 如果流处理器不存在,或者流处理器目前未运行。

您可以修改现有流处理器的以下元素:

要修改流处理器,请执行以下步骤:

  1. 停止流处理器。

  2. 将您的更新应用于流处理器。

  3. 重新启动流处理器。

默认情况下,修改后的处理器会从最后一个检查点恢复。或者,您可以设置 resumeFromCheckpoint=false,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上被完全重新计算。

注意

如果您使用Operator (包含is contains、 等匹配器表达式)为其配置了 Stream Processor State is failed警报的流处理器更改了名称,则Atlas不会触发如果匹配器表达式与新名称不匹配,则针对重命名的流处理器发出警报。要监控重命名的流处理器,请重新配置警报。

当启用默认设置 resumeFromCheckpoint=true 时,以下限制将适用:

  • 您无法修改 $source 阶段。

  • 您无法修改窗口的间隔时间。

  • 您无法移除窗口。

  • 只有当窗口的内部管道中包含 $group$sort 阶段时,您才能修改带有窗口的管道。

  • 您无法更改现有的窗口类型。例如,您无法将 $tumblingWindow 更改为 $hoppingWindow,反之亦然。

  • 带有窗口的处理器可能会在重新计算窗口时重新处理某些数据。

需要mongosh v2.3.4+。

使用 sp.<streamprocessor>.modify() 命令来修改现有的流处理器。<streamprocessor> 必须是为当前流处理实例定义的已停止流处理器的名称。

例如,要修改名为 proc01 的流处理器,请运行以下命令:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

Atlas Administration API 提供了一个用于修改流处理器的终结点。

修改一个流处理器

要删除流处理器:

Atlas Administration API 提供了一个用于删除流处理器的端点。

删除一个流处理器

要使用 mongosh 删除现有的流处理器,请使用sp.<streamprocessor>.drop()方法。<streamprocessor> 必须是为当前流处理实例定义的流处理器的名称。

例如,要删除名为proc01的流处理器,请运行以下命令:

sp.proc01.drop()

此方法返回:

  • true 如果流处理器存在。

  • false 如果流处理器不存在。

删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。

列出所有可用的流处理器:

Atlas Administration API 提供了一个用于列出所有可用流处理器的端点。

列出流处理器

要使用 mongosh 方法列出当前流处理实例上所有可用的流处理器,请使用 sp.listStreamProcessors()方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它采用以下语法:

sp.listStreamProcessors(<filter>)

<filter> 是一个文档,指定了过滤列表的字段。

例子

以下示例显示了未经筛选的请求的返回值:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state""running"的情况,您将看到以下输出:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

要使用 mongosh 将现有流处理器中的采样结果数组返回到 STDOUT,请使用 sp.<streamprocessor>.sample() 方法。<streamprocessor> 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令示例来自名为 proc01 的流处理器。

sp.proc01.sample()

此命令将持续运行,直到您使用 CTRL-C 取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage 文档中报告示例中的无效文档:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。

注意

Atlas Stream Processing会丢弃处于 stopped 状态且持续 45 天或更长时间的流处理器的内部状态。 当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要查看流处理器的统计信息:

Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。

获取一个流处理器

要使用 mongosh 返回一份总结现有流处理器当前状态的文档,请使用 sp.<streamprocessor>.stats() 方法。streamprocessor 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。 它采用以下语法:

sp.<streamprocessor>.stats({options: {<options>}})

其中options是具有以下字段的可选文档:

字段
类型
说明

scale

整型

用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定scale1024

verbose

布尔

指定输出文档详细程度的标志。 如果设置为true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false

输出文档包含以下字段:

字段
类型
说明

ns

字符串

定义流处理器的命名空间。

stats

对象

描述流处理器操作状态的文档。

stats.name

字符串

流处理器的名称。

stats.status

字符串

流处理器的状态。 此字段可为以下值:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

整型

大小字段的显示比例。如果设置为1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。

stats.inputMessageCount

整型

发布到流的文档数量。 文档在通过$source阶段(而不是通过整个管道时)才被视为已“发布”到流。

stats.inputMessageSize

整型

发布到流的字节数或千字节数。 字节在经过$source阶段(而不是经过整个管道时)被视为已“发布”到流。

stats.outputMessageCount

整型

该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。

stats.outputMessageSize

整型

流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。

stats.dlqMessageCount

整型

stats.dlqMessageSize

整型

stats.changeStreamTimeDifferenceSecs

整型

最新变更流恢复令牌所代表的事件时间与 oplog 中最新事件之间的时间差(以秒为单位)。

stats.changeStreamState

token

最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。

stats.stateSize

整型

Windows 用于存储处理器状态的字节数。

stats.watermark

整型

当前水印的时间戳。

stats.operatorStats

阵列

处理器管道中每个操作符的统计信息。 仅当您传入verbose选项时,Atlas Stream Processing 才会返回此字段。

stats.operatorStats提供许多核心stats字段的每个操作符版本:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

stats.operatorStats 包含以下独特字段:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats 还包括以下字段,前提是您已传递 verbose 选项,并且您的处理器包含窗口阶段:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

整型

操作符的最大内存使用量(以字节或千字节为单位)。

stats.operatorStats.executionTime

整型

操作符的总执行时间(以秒为单位)。

stats.minOpenWindowStartTime

日期

最小打开窗口的开始时间。此值是可选的。

stats.maxOpenWindowStartTime

日期

最大开放窗口的开始时间。此值是可选的。

stats.kafkaPartitions

阵列

Apache Kafka 代理分区的偏移量信息。kafkaPartitions 仅适用于使用 Apache Kafka 源的连接。

stats.kafkaPartitions.partition

整型

Apache Kafka 主题分区编号.

stats.kafkaPartitions.currentOffset

整型

流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上1

stats.kafkaPartitions.checkpointOffset

整型

流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。

stats.kafkaPartitions.isIdle

布尔

指示分区是否空闲的标志。此值默认设置为 false

例如,以下显示了名为inst01的Atlas Stream Processing实例上名为proc01的流处理器的状态,其中项大小以 KB 为单位:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}