托管流处理器
在此页面上
Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程多达 4 个流处理器。对于超出此限制的额外处理器,Atlas Stream Processing 将分配新的资源。
先决条件
要创建和管理流处理器,您必须具备:
mongosh
2.0或更高版本具有
atlasAdmin
角色的数据库用户,用于创建和运行流处理器Atlas 集群
Considerations
许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( -
) 或句点 ( .
),则必须将名称用方括号 ( []
) 和双引号 ( ""
) 括在方法调用,如sp.["special-name-stream"].stats()
中。
以交互方式创建流处理器
您可以使用 sp.process()
方法交互式创建流处理器。您以交互方式创建的流处理器会表现出以下行为:
将输出和死信队列文档写入shell
创建后立即开始运行
运行 10 分钟或直到用户停止它们
停止后不要继续
您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。
sp.process()
通过以下语法实现:
sp.process(<pipeline>)
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
pipeline | 阵列 | 必需 | 要应用于流媒体数据的管道。 |
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner
的用户身份连接到Atlas Stream Processing实例:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在 mongosh
提示符,分配一个数组,其中包含要应用于名为 pipeline
的变量的聚合阶段。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
创建流处理器
要创建流处理器:
Atlas Administration API 提供了一个用于创建流处理器的端点。
要使用mongosh
创建新的流处理器,请使用sp.createStreamProcessor()
方法。 它具有以下语法:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | 类型 | 必要性 | 说明 |
---|---|---|---|
name | 字符串 | 必需 | 流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。 |
pipeline | 阵列 | 必需 | 要应用于流媒体数据的管道。 |
options | 对象 | Optional | 为流处理器定义各种可选设置的对象。 |
options.dlq | 对象 | 可选的 | 为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义 options 字段,则此字段是必需的。 |
options.dlq.connectionName | 字符串 | 可选的 | 人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义 options.dlq 字段,则此字段是必需的。 |
options.dlq.db | 字符串 | 可选的 | options.dlq.connectionName 中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq 字段,则此字段是必需的。 |
options.dlq.coll | 字符串 | 可选的 | options.dlq.db 中指定的数据库中的集合名称。 如果您定义options.dlq 字段,则此字段是必需的。 |
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner
的用户身份连接到Atlas Stream Processing实例:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在 mongosh
提示符,分配一个数组,其中包含要应用于名为 pipeline
的变量的聚合阶段。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
启动流处理器
要启动流处理器:
停止流处理器
要停止流处理器,请执行以下操作:
删除流处理器
要删除流处理器,请执行以下操作:
列出可用的流处理器
列出所有可用的流处理器:
Atlas Administration API 提供了一个端点,用于列出所有可用的流处理器。
要使用 mongosh
方法列出当前流处理实例上所有可用的流处理器,请使用 sp.listStreamProcessors()
方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它采用以下语法:
sp.listStreamProcessors(<filter>)
<filter>
是一个文档,指定要按哪些字段筛选列表。
例子
以下示例显示了未经筛选的请求的返回值:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state"
为"running"
的情况,您将看到以下输出:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
来自流处理器的样本
要使用 mongosh
将现有流处理器中的采样结果数组返回到 STDOUT
,请使用 sp.<streamprocessor>.sample()
方法。<streamprocessor>
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令示例来自名为 proc01
的流处理器。
sp.proc01.sample()
此命令将持续运行,直到您使用 CTRL-C
取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage
文档中报告示例中的无效文档:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。
查看流处理器的统计信息
要查看流处理器的统计信息:
Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。
要使用 mongosh
返回一份总结现有流处理器当前状态的文档,请使用 sp.<streamprocessor>.stats()
方法。streamprocessor
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。 它采用以下语法:
sp.<streamprocessor>.stats({options: {<options>}})
其中options
是具有以下字段的可选文档:
字段 | 类型 | 说明 |
---|---|---|
scale | 整型 | 用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定 scale 为1024 。 |
verbose | 布尔 | 指定输出文档详细程度的标志。 如果设置为 true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false 。 |
输出文档包含以下字段:
字段 | 类型 | 说明 |
---|---|---|
ns | 字符串 | 定义流处理器的命名空间。 |
stats | 对象 | 描述流处理器操作状态的文档。 |
stats.name | 字符串 | 流处理器的名称。 |
stats.status | 字符串 | 流处理器的状态。 此字段可为以下值:
|
stats.scaleFactor | 整型 | 大小字段的显示比例。如果设置为 1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。 |
stats.inputMessageCount | 整型 | 发布到流的文档数量。 文档在通过 $source 阶段(而不是通过整个管道时)才被视为已“发布”到流。 |
stats.inputMessageSize | 整型 | 发布到流的字节数或千字节数。 字节在经过 $source 阶段(而不是经过整个管道时)被视为已“发布”到流。 |
stats.outputMessageCount | 整型 | 该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。 |
stats.outputMessageSize | 整型 | 流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。 |
stats.dlqMessageCount | 整型 | |
stats.dlqMessageSize | 整型 | |
stats.changeStreamTimeDifferenceSecs | 整型 | |
stats.changeStreamState | token | 最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。 |
stats.stateSize | 整型 | Windows 用于存储处理器状态的字节数。 |
stats.watermark | 整型 | 当前水印的时间戳。 |
stats.operatorStats | 阵列 | 处理器管道中每个操作符的统计信息。 仅当您传入
此外,
|
stats.operatorStats.maxMemoryUsage | 整型 | 操作符的最大内存使用量(以字节或千字节为单位)。 |
stats.operatorStats.executionTime | 整型 | 操作符的总执行时间(以秒为单位)。 |
stats.kafkaPartitions | 阵列 | |
stats.kafkaPartitions.partition | 整型 | Apache Kafka 主题分区编号. |
stats.kafkaPartitions.currentOffset | 整型 | 流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上 1 。 |
stats.kafkaPartitions.checkpointOffset | 整型 | 流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。 |
例如,以下显示了名为inst01
的Atlas Stream Processing实例上名为proc01
的流处理器的状态,其中项大小以 KB 为单位:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }