托管流处理器
在此页面上
Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程多达 4 个流处理器。对于超出此限制的额外处理器,Atlas Stream Processing 将分配新的资源。
先决条件
要创建和管理流处理器,您必须具备:
mongosh
2.0或更高版本具有
atlasAdmin
角色的数据库用户,用于创建和运行流处理器Atlas 集群
Considerations
许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( -
) 或句点 ( .
),则必须将名称用方括号 ( []
) 和双引号 ( ""
) 括在方法调用,如sp.["special-name-stream"].stats()
中。
以交互方式创建流处理器
您可以使用 sp.process()
方法交互式创建流处理器。您以交互方式创建的流处理器会表现出以下行为:
将输出和死信队列文档写入shell
创建后立即开始运行
运行 10 分钟或直到用户停止它们
停止后不要继续
您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。
sp.process()
通过以下语法实现:
sp.process(<pipeline>)
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 阵列 | 必需 | 要应用于流媒体数据的管道。 |
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 x.059 身份验证,以名为 streamOwner
的用户身份连接到流处理实例:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在 mongosh
提示符,分配一个数组,其中包含要应用于名为 pipeline
的变量的聚合阶段。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
创建流处理器
要创建流处理器:
Atlas Administration API 提供了一个用于创建流处理器的端点。
要使用mongosh
创建新的流处理器,请使用sp.createStreamProcessor()
方法。 它具有以下语法:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | 流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。 |
| 阵列 | 必需 | 要应用于流媒体数据的管道。 |
| 对象 | Optional | 为流处理器定义各种可选设置的对象。 |
| 对象 | 可选的 | 为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义 |
| 字符串 | 可选的 | 人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义 |
| 字符串 | 可选的 |
|
| 字符串 | 可选的 |
|
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 x.059 身份验证,以名为 streamOwner
的用户身份连接到流处理实例。
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在 mongosh
提示符,分配一个数组,其中包含要应用于名为 pipeline
的变量的聚合阶段。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
启动流处理器
要启动流处理器:
停止流处理器
要停止流处理器:
修改流处理器
您可以修改现有流处理器的以下元素:
要修改流处理器,请执行以下操作:
默认,修改后的处理器会从上一个检查点恢复。或者,您可以设立resumeFromCheckpoint=false
,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上完全重新计算。
限制
启用默认设置 resumeFromCheckpoint=true
后,存在以下应用:
您无法修改
$source
阶段。您无法修改窗口的时间间隔。
您无法删除窗口。
只有当窗口的内部管道具有
$group
或$sort
阶段时,您才能修改带有窗口的管道。您无法更改现有的窗口类型。示例,您不能将
$tumblingWindow
更改为$hoppingWindow
,反之亦然。带窗口的处理器可能会重新处理某些数据,作为重新计算窗口的结果。
要修改流处理器,请执行以下操作:
需要mongosh
v2.3.4 +。
使用 sp.<streamprocessor>.modify()
命令修改现有流处理器。 <streamprocessor>
必须是为当前流处理实例定义的已停止流处理器的名称。
示例,要修改名为 proc01
的流处理器,请运行以下命令:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
向现有管道添加阶段
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
修改流处理器的输入源
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
从流处理器中删除死信队列
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
修改带窗口的流处理器
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
Atlas Administration API提供了用于修改流处理器的端点。
删除流处理器
要删除流处理器:
列出可用的流处理器
列出所有可用的流处理器:
Atlas Administration API 提供了一个用于列出所有可用流处理器的端点。
要使用 mongosh
方法列出当前流处理实例上所有可用的流处理器,请使用 sp.listStreamProcessors()
方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它采用以下语法:
sp.listStreamProcessors(<filter>)
<filter>
是一个文档,指定要按哪些字段筛选列表。
例子
以下示例显示了未经筛选的请求的返回值:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state"
为"running"
的情况,您将看到以下输出:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
来自流处理器的样本
要使用 mongosh
将现有流处理器中的采样结果数组返回到 STDOUT
,请使用 sp.<streamprocessor>.sample()
方法。<streamprocessor>
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令示例来自名为 proc01
的流处理器。
sp.proc01.sample()
此命令将持续运行,直到您使用 CTRL-C
取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage
文档中报告示例中的无效文档:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。
查看流处理器的统计信息
要查看流处理器的统计信息:
Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。
要使用 mongosh
返回一份总结现有流处理器当前状态的文档,请使用 sp.<streamprocessor>.stats()
方法。streamprocessor
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。 它采用以下语法:
sp.<streamprocessor>.stats({options: {<options>}})
其中options
是具有以下字段的可选文档:
字段 | 类型 | 说明 |
---|---|---|
| 整型 | 用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定 |
| 布尔 | 指定输出文档详细程度的标志。 如果设置为 |
输出文档包含以下字段:
字段 | 类型 | 说明 |
---|---|---|
| 字符串 | 定义流处理器的命名空间。 |
| 对象 | 描述流处理器操作状态的文档。 |
| 字符串 | 流处理器的名称。 |
| 字符串 | 流处理器的状态。 此字段可为以下值:
|
| 整型 | 大小字段的显示比例。如果设置为 |
| 整型 | 发布到流的文档数量。 文档在通过 |
| 整型 | 发布到流的字节数或千字节数。 字节在经过 |
| 整型 | 该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。 |
| 整型 | 流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。 |
| 整型 | |
| 整型 | |
| 整型 | |
| token | 最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。 |
| 整型 | Windows 用于存储处理器状态的字节数。 |
| 整型 | 当前水印的时间戳。 |
| 阵列 | 处理器管道中每个操作符的统计信息。 仅当您传入
|
| 整型 | 操作符的最大内存使用量(以字节或千字节为单位)。 |
| 整型 | 操作符的总执行时间(以秒为单位)。 |
| 日期 | The start time of the minimum open 窗口.此值是可选的。 |
| 日期 | The start time of the maximum open 窗口.此值是可选的。 |
| 阵列 | Apache Kafka代理分区的偏移量信息。 |
| 整型 | Apache Kafka 主题分区编号. |
| 整型 | 流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上 |
| 整型 | 流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。 |
| 布尔 | 指示分区是否空闲的标志。此值默认为 |
例如,以下显示了名为inst01
的Atlas Stream Processing实例上名为proc01
的流处理器的状态,其中项大小以 KB 为单位:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }