Docs 菜单
Docs 主页
/
MongoDB Atlas
/

托管流处理器

在此页面上

  • 先决条件
  • Considerations
  • 以交互方式创建流处理器
  • 连接到您的Atlas Stream Processing实例。
  • 定义管道。
  • 创建流处理器。
  • 创建流处理器
  • 启动流处理器
  • 停止流处理器
  • 修改流处理器
  • 限制
  • 要修改流处理器,请执行以下操作:
  • 删除流处理器
  • 列出可用的流处理器
  • 来自流处理器的样本
  • 查看流处理器的统计信息

Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程多达 4 个流处理器。对于超出此限制的额外处理器,Atlas Stream Processing 将分配新的资源。

要创建和管理流处理器,您必须具备:

许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。

您可以使用 sp.process() 方法交互式创建流处理器。您以交互方式创建的流处理器会表现出以下行为:

  • 将输出和死信队列文档写入shell

  • 创建后立即开始运行

  • 运行 10 分钟或直到用户停止它们

  • 停止后不要继续

您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。

sp.process() 通过以下语法实现:

sp.process(<pipeline>)
字段
类型
必要性
说明

pipeline

阵列

必需

要应用于流媒体数据的管道

1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 x.059 身份验证,以名为 streamOwner 的用户身份连接到流处理实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

以下命令创建一个流处理器,该处理器应用pipeline中定义的逻辑。

sp.process(pipeline)

要创建流处理器:

Atlas Administration API 提供了一个用于创建流处理器的端点。

创建一个流处理器

要使用mongosh创建新的流处理器,请使用sp.createStreamProcessor()方法。 它具有以下语法:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
类型
必要性
说明

name

字符串

必需

流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。

pipeline

阵列

必需

要应用于流媒体数据的管道

options

对象

Optional

为流处理器定义各种可选设置的对象。

options.dlq

对象

可选的

为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义options字段,则此字段是必需的。

options.dlq.connectionName

字符串

可选的

人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.db

字符串

可选的

options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.coll

字符串

可选的

options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。

1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 x.059 身份验证,以名为 streamOwner 的用户身份连接到流处理实例。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh提示符中,分配一个包含以下 DLQ 属性的对象:

  • connectionName

  • 数据库名称

  • 集合名称

以下示例通过metadata.dlq collection 中的cluster01连接定义了一个 DLQ。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

以下命令创建一个名为proc01的流处理器,它应用pipeline中定义的逻辑。 在处理中引发错误的文档将写入deadLetter中定义的 DLQ。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

注意

Atlas Stream Processing会丢弃处于 状态且持续stopped 45天或更长时间的流处理器的内部状态。当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要启动流处理器:

Atlas Administration API 提供了一个用于启动流处理器的端点。

启动一个流处理器

要使用 mongosh 启动现有流处理器,请使用 sp.<streamprocessor>.start() 方法。<streamprocessor> 必须是为当前流处理实例定义的流处理器的名称。

例如,要启动名为proc01的流处理器,请运行以下命令:

sp.proc01.start()

此方法返回:

  • true 如果流处理器存在并且当前未运行。

  • false 如果您尝试启动不存在或存在但当前正在运行的流处理器。

注意

Atlas Stream Processing会丢弃处于 状态且持续stopped 45天或更长时间的流处理器的内部状态。当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要停止流处理器:

Atlas Administration API 提供了一个用于停止流处理器的端点。

停止一个流处理器

要使用 mongosh 停止现有的流处理器,请使用 sp.<streamprocessor>.stop() 方法。<streamprocessor> 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。

例如,要停止名为proc01的流处理器,请运行以下命令:

sp.proc01.stop()

此方法返回:

  • true 如果流处理器存在且当前正在运行。

  • false 如果流处理器不存在,或者流处理器当前未运行。

您可以修改现有流处理器的以下元素:

要修改流处理器,请执行以下操作:

  1. 停止流处理器。

  2. 将更新应用于流处理器。

  3. 重新启动流处理器。

默认,修改后的处理器会从上一个检查点恢复。或者,您可以设立resumeFromCheckpoint=false,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上完全重新计算。

启用默认设置 resumeFromCheckpoint=true 后,存在以下应用:

  • 您无法修改 $source 阶段。

  • 您无法修改窗口的时间间隔。

  • 您无法删除窗口。

  • 只有当窗口的内部管道具有 $group$sort 阶段时,您才能修改带有窗口的管道。

  • 您无法更改现有的窗口类型。示例,您不能将 $tumblingWindow 更改为 $hoppingWindow,反之亦然。

  • 带窗口的处理器可能会重新处理某些数据,作为重新计算窗口的结果。

需要mongosh v2.3.4 +。

使用 sp.<streamprocessor>.modify() 命令修改现有流处理器。 <streamprocessor> 必须是为当前流处理实例定义的已停止流处理器的名称。

示例,要修改名为 proc01 的流处理器,请运行以下命令:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

Atlas Administration API提供了用于修改流处理器的端点。

修改一个流处理器

要删除流处理器:

Atlas Administration API 提供了一个用于删除流处理器的端点。

删除一个流处理器

要使用 mongosh 删除现有的流处理器,请使用sp.<streamprocessor>.drop()方法。<streamprocessor> 必须是为当前流处理实例定义的流处理器的名称。

例如,要删除名为proc01的流处理器,请运行以下命令:

sp.proc01.drop()

此方法返回:

  • true 如果流处理器存在。

  • false 如果流处理器不存在。

删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。

列出所有可用的流处理器:

Atlas Administration API 提供了一个用于列出所有可用流处理器的端点。

列出流处理器

要使用 mongosh 方法列出当前流处理实例上所有可用的流处理器,请使用 sp.listStreamProcessors()方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它采用以下语法:

sp.listStreamProcessors(<filter>)

<filter> 是一个文档,指定要按哪些字段筛选列表。

例子

以下示例显示了未经筛选的请求的返回值:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state""running"的情况,您将看到以下输出:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

要使用 mongosh 将现有流处理器中的采样结果数组返回到 STDOUT,请使用 sp.<streamprocessor>.sample() 方法。<streamprocessor> 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令示例来自名为 proc01 的流处理器。

sp.proc01.sample()

此命令将持续运行,直到您使用 CTRL-C 取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage 文档中报告示例中的无效文档:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。

注意

Atlas Stream Processing会丢弃处于 状态且持续stopped 45天或更长时间的流处理器的内部状态。当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要查看流处理器的统计信息:

Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。

获取一个流处理器

要使用 mongosh 返回一份总结现有流处理器当前状态的文档,请使用 sp.<streamprocessor>.stats() 方法。streamprocessor 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。 它采用以下语法:

sp.<streamprocessor>.stats({options: {<options>}})

其中options是具有以下字段的可选文档:

字段
类型
说明

scale

整型

用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定scale1024

verbose

布尔

指定输出文档详细程度的标志。 如果设置为true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false

输出文档包含以下字段:

字段
类型
说明

ns

字符串

定义流处理器的命名空间。

stats

对象

描述流处理器操作状态的文档。

stats.name

字符串

流处理器的名称。

stats.status

字符串

流处理器的状态。 此字段可为以下值:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

整型

大小字段的显示比例。如果设置为1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。

stats.inputMessageCount

整型

发布到流的文档数量。 文档在通过$source阶段(而不是通过整个管道时)才被视为已“发布”到流。

stats.inputMessageSize

整型

发布到流的字节数或千字节数。 字节在经过$source阶段(而不是经过整个管道时)被视为已“发布”到流。

stats.outputMessageCount

整型

该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。

stats.outputMessageSize

整型

流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。

stats.dlqMessageCount

整型

stats.dlqMessageSize

整型

stats.changeStreamTimeDifferenceSecs

整型

最新变更流恢复令牌所代表的事件时间与 oplog 中最新事件之间的时间差(以秒为单位)。

stats.changeStreamState

token

最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。

stats.stateSize

整型

Windows 用于存储处理器状态的字节数。

stats.watermark

整型

当前水印的时间戳。

stats.operatorStats

阵列

处理器管道中每个操作符的统计信息。 仅当您传入verbose选项时,Atlas Stream Processing 才会返回此字段。

stats.operatorStats 提供许多核心stats字段的每个操作符版本:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

stats.operatorStats 包括以下唯一字段:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

整型

操作符的最大内存使用量(以字节或千字节为单位)。

stats.operatorStats.executionTime

整型

操作符的总执行时间(以秒为单位)。

stats.minOpenWindowStartTime

日期

The start time of the minimum open 窗口.此值是可选的。

stats.maxOpenWindowStartTime

日期

The start time of the maximum open 窗口.此值是可选的。

stats.kafkaPartitions

阵列

stats.kafkaPartitions.partition

整型

Apache Kafka 主题分区编号.

stats.kafkaPartitions.currentOffset

整型

流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上1

stats.kafkaPartitions.checkpointOffset

整型

流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。

stats.kafkaPartitions.isIdle

布尔

指示分区是否空闲的标志。此值默认为 false

例如,以下显示了名为inst01的Atlas Stream Processing实例上名为proc01的流处理器的状态,其中项大小以 KB 为单位:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

后退

管理连接