Docs 菜单
Docs 主页
/
MongoDB Atlas
/

托管流处理器

在此页面上

  • 先决条件
  • Considerations
  • 以交互方式创建流处理器
  • 连接到您的Atlas Stream Processing实例。
  • 定义管道。
  • 创建流处理器。
  • 创建流处理器
  • 启动流处理器
  • 停止流处理器
  • 删除流处理器
  • 列出可用的流处理器
  • 来自流处理器的样本
  • 查看流处理器的统计信息

Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程多达 4 个流处理器。对于超出此限制的额外处理器,Atlas Stream Processing 将分配新的资源。

要创建和管理流处理器,您必须具备:

许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。

您可以使用 sp.process() 方法交互式创建流处理器。您以交互方式创建的流处理器会表现出以下行为:

  • 将输出和死信队列文档写入shell

  • 创建后立即开始运行

  • 运行 10 分钟或直到用户停止它们

  • 停止后不要继续

您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。

sp.process() 通过以下语法实现:

sp.process(<pipeline>)
字段
类型
必要性
说明
pipeline
阵列
必需
要应用于流媒体数据的管道
1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner的用户身份连接到Atlas Stream Processing实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

以下命令创建一个流处理器,该处理器应用pipeline中定义的逻辑。

sp.process(pipeline)

要创建流处理器:

Atlas Administration API 提供了一个用于创建流处理器的端点。

创建一个流处理器

要使用mongosh创建新的流处理器,请使用sp.createStreamProcessor()方法。 它具有以下语法:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
类型
必要性
说明
name
字符串
必需
流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。
pipeline
阵列
必需
要应用于流媒体数据的管道
options
对象
Optional
为流处理器定义各种可选设置的对象。
options.dlq
对象
可选的
为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义options字段,则此字段是必需的。
options.dlq.connectionName
字符串
可选的
人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。
options.dlq.db
字符串
可选的
options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。
options.dlq.coll
字符串
可选的
options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。
1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner的用户身份连接到Atlas Stream Processing实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh提示符中,分配一个包含以下 DLQ 属性的对象:

  • connectionName

  • 数据库名称

  • 集合名称

以下示例通过metadata.dlq collection 中的cluster01连接定义了一个 DLQ。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

以下命令创建一个名为proc01的流处理器,它应用pipeline中定义的逻辑。 在处理中引发错误的文档将写入deadLetter中定义的 DLQ。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

要启动流处理器:

Atlas Administration API 提供了一个用于启动流处理器的端点。

启动一个流处理器

要使用 mongosh 启动现有流处理器,请使用 sp.<streamprocessor>.start() 方法。<streamprocessor> 必须是为当前流处理实例定义的流处理器的名称。

例如,要启动名为proc01的流处理器,请运行以下命令:

sp.proc01.start()

此方法返回:

  • true 如果流处理器存在并且当前未运行。

  • false 如果您尝试启动不存在或存在但当前正在运行的流处理器。

要停止流处理器,请执行以下操作:

Atlas Administration API 提供了一个用于停止流处理器的端点。

停止一个流处理器

要使用 mongosh 停止现有的流处理器,请使用 sp.<streamprocessor>.stop() 方法。<streamprocessor> 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。

例如,要停止名为proc01的流处理器,请运行以下命令:

sp.proc01.stop()

此方法返回:

  • true 如果流处理器存在且当前正在运行。

  • false 如果流处理器不存在,或者流处理器当前未运行。

要删除流处理器,请执行以下操作:

Atlas Administration API 提供了一个用于删除流处理器的端点。

删除一个流处理器

要使用 mongosh 删除现有的流处理器,请使用sp.<streamprocessor>.drop()方法。<streamprocessor> 必须是为当前流处理实例定义的流处理器的名称。

例如,要删除名为proc01的流处理器,请运行以下命令:

sp.proc01.drop()

此方法返回:

  • true 如果流处理器存在。

  • false 如果流处理器不存在。

删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。

列出所有可用的流处理器:

Atlas Administration API 提供了一个端点,用于列出所有可用的流处理器。

删除一个流处理器

要使用 mongosh 方法列出当前流处理实例上所有可用的流处理器,请使用 sp.listStreamProcessors()方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它采用以下语法:

sp.listStreamProcessors(<filter>)

<filter> 是一个文档,指定要按哪些字段筛选列表。

例子

以下示例显示了未经筛选的请求的返回值:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state""running"的情况,您将看到以下输出:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

要使用 mongosh 将现有流处理器中的采样结果数组返回到 STDOUT,请使用 sp.<streamprocessor>.sample() 方法。<streamprocessor> 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令示例来自名为 proc01 的流处理器。

sp.proc01.sample()

此命令将持续运行,直到您使用 CTRL-C 取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage 文档中报告示例中的无效文档:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。

要查看流处理器的统计信息:

Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。

获取一个流处理器

要使用 mongosh 返回一份总结现有流处理器当前状态的文档,请使用 sp.<streamprocessor>.stats() 方法。streamprocessor 必须是为当前流处理实例定义的当前正在运行的流处理器的名称。 它采用以下语法:

sp.<streamprocessor>.stats({options: {<options>}})

其中options是具有以下字段的可选文档:

字段
类型
说明
scale
整型
用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定scale1024
verbose
布尔
指定输出文档详细程度的标志。 如果设置为true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false

输出文档包含以下字段:

字段
类型
说明
ns
字符串
定义流处理器的命名空间。
stats
对象
描述流处理器操作状态的文档。
stats.name
字符串
流处理器的名称。
stats.status
字符串

流处理器的状态。 此字段可为以下值:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
整型
大小字段的显示比例。如果设置为1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。
stats.inputMessageCount
整型
发布到流的文档数量。 文档在通过$source阶段(而不是通过整个管道时)才被视为已“发布”到流。
stats.inputMessageSize
整型
发布到流的字节数或千字节数。 字节在经过$source阶段(而不是经过整个管道时)被视为已“发布”到流。
stats.outputMessageCount
整型
该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。
stats.outputMessageSize
整型
流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。
stats.dlqMessageCount
整型
stats.dlqMessageSize
整型
stats.changeStreamTimeDifferenceSecs
整型
最新变更流恢复令牌所代表的事件时间与 oplog 中最新事件之间的时间差(以秒为单位)。
stats.changeStreamState
token
最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。
stats.stateSize
整型
Windows 用于存储处理器状态的字节数。
stats.watermark
整型
当前水印的时间戳。
stats.operatorStats
阵列

处理器管道中每个操作符的统计信息。 仅当您传入verbose选项时,Atlas Stream Processing 才会返回此字段。

stats.operatorStats 提供许多核心stats字段的每个操作符版本:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

此外, stats.operatorStats还包括以下唯一字段:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
整型
操作符的最大内存使用量(以字节或千字节为单位)。
stats.operatorStats.executionTime
整型
操作符的总执行时间(以秒为单位)。
stats.kafkaPartitions
阵列
stats.kafkaPartitions.partition
整型
Apache Kafka 主题分区编号.
stats.kafkaPartitions.currentOffset
整型
流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上1
stats.kafkaPartitions.checkpointOffset
整型
流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。

例如,以下显示了名为inst01的Atlas Stream Processing实例上名为proc01的流处理器的状态,其中项大小以 KB 为单位:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

后退

管理连接