Docs 菜单
Docs 主页
/
MongoDB Atlas
/

Atlas Stream Processing 概述

在此页面上

  • 流媒体数据
  • 架构
  • 流处理器的结构
  • 检查点
  • 死信队列(DLQ)
  • Atlas Stream Processing 区域
  • 后续步骤

Atlas Stream Processing 使您能够使用与 Atlas 数据库相同的 query API处理复杂数据流。Atlas Stream Processing 允许您:

  • 构建聚合管道,以便对流数据进行连续操作,而不会出现批处理固有的延迟。

  • 执行持续的模式验证,以检查消息的格式是否正确,检测消息损坏并检测延迟到达的数据。

  • 持续将结果发布到 Atlas 集合或 Apache Kafka 集群,确保最新的数据视图和分析。

Atlas Stream Processing 组件直接属于 Atlas 项目,并独立于 Atlas 集群运行。

流是源自一个或多个源的连续数据流,采用仅追加日志的形式。数据流示例包括传感器的温度或压力读数、金融交易记录或变更数据捕获事件。

数据流源自 Apache Kafka 主题 来源 或 变更流 。然后,您可以将处理后的数据写入 接收 器,例如 Apache Kafka 或 Atlas 集合。

数据流源自状态快速变化的系统。 Atlas Stream Processing 提供原生流处理功能,可对连续数据进行操作,而不受静态数据库的时间和计算限制。

Atlas Stream Processing 的核心抽象是流处理器。 流处理器是一种 MongoDB聚合管道查询,它对来自指定源的流数据持续运行,并将输出写入接收器。 要了解更多信息,请参阅流处理器的结构。

流处理在流处理实例上进行。每个流处理实例都是一个关联以下内容的 Atlas 命名空间:

  • 一个或多个工作线程,用于提供运行流处理器所需的 RAM 和 CPU。

  • 云提供商和云区域。

  • 连接注册表,用于存储流媒体数据的可用源和接收器列表。

  • 定义用户授权的安全上下文。

  • 实例本身的 string连接 。Atlas Stream Processing

定义流处理器后,该处理器只能在定义它的流处理实例中使用。每个工作节点最多可以承载四个正在运行的流处理器;当您启动流处理器时,Atlas Stream Processing 会根据需要配置工作节点,从而自动扩展您的流处理实例。您可以通过停止工作节点上的所有流处理器来取消预配该工作节点。Atlas Stream Processing 始终倾向于将流处理器分配给现有工作节点,而不是配置新的工作节点。

例子

您有一个Atlas Stream Processing实例,运行八个流处理器,分别命名为 proc01proc08proc01proc04在一个工作线程上运行, proc05proc08在第二个工作线程上运行。 您启动一个名为proc09的新流处理器。 Atlas Stream Processing 预配第三个工作线程来托管proc09

后来,您在第一个工作线程上停止了 proc03。当您停止 proc09 并重新启动它时,Atlas Stream Processing 会将 proc09 重新分配给第一个工作线程,并取消配置第三个工作线程。

如果您在停止并重新启动 proc09 之前启动名为 proc10 的新流处理器,则 Atlas Stream Processing 会将 proc10 分配给先前分配给 proc03 的槽位中的第一个工作线程。

在扩展时,Atlas Stream Processing 仅考虑当前正在运行的流处理器的数量;它不计算未运行的定义流处理器。流处理实例的层级决定了其工作线程的 RAM 和 CPU 分配。

连接注册表存储一个或多个连接。每个连接都会为网络和安全细节的组合分配一个名称,从而允许流处理器与外部服务进行交互。连接表现出以下行为:

  • 只有在特定流处理实例的连接注册表中定义的连接才能为该流处理实例上托管的流处理器提供服务。

  • 每个连接可以为任意数量的流处理器提供服务

  • 只有一个连接可以作为给定流处理器的源。

  • 只有单个连接可以用作给定流处理器的接收器。

  • 连接并非天生就被定义为源或接收器。任何给定的连接都可以提供任一功能,具体取决于流处理器如何调用该连接。

Atlas Stream Processing在多租户基础架构上的专用客户容器中运行Atlas Stream Processing工作线程。 有关 MongoDB 安全性和合规性的更多信息,请参阅MongoDB 信任中心。

流处理器采用聚合管道的形式。每个处理器从 $source 阶段开始,该阶段连接到源,并开始接收文档形式的连续数据流。这些文档必须是有效的 jsonejson$source 之后的每个聚合阶段依次使用流中的每条记录,可以分为三种类型:

  • 验证$validate 阶段允许您对摄取的文档执行模式验证,以确保只有格式正确的文档才能继续进行进一步处理,并确定如何处理格式不正确的文档。验证是可选的。

  • 无状态操作:可直接作用于输入数据流的聚合阶段或操作符。这些聚合依次消耗、转换和传递每个文档,可以出现在 $source$emit$merge 阶段之间的任何位置。

  • 有状态操作:只能作用于有界文档集的聚合阶段或操作符。这些聚合同时消耗、转换和传递整组文档,并且只能出现在窗口内。

Windows 是管道阶段,它使用流数据并将其分区为按时间分隔的数据集,以便您可以应用不适用于无限数据的阶段和操作符,例如$group$avg 。 每个流处理器只能有一个窗口阶段。

处理摄取的数据后,流处理器会使用 $emit 阶段将其写入流媒体数据平台,或使用 $merge 阶段将其写入 Atlas 数据库。这些阶段彼此互斥,流处理器只能有一个这样的阶段。

Atlas Stream Processing 使用检查点文档捕获流处理器的状态。这些文档具有唯一的 ID,并受流处理器逻辑流的约束。当流处理器的最后一个操作符完成对检查点文档的操作时,Atlas Stream Processing 会提交该检查点,生成两种类型的记录:

  • 验证检查点 ID 及其所属流处理器的单个记录

  • 一组记录,描述 Atlas Stream Processing 提交检查点时相关流处理器中每个有状态操作的状态。

当您在中断后重新启动流处理器时,Atlas Stream Processing 会查询最后提交的检查点并从所述状态恢复操作。

Atlas Stream Processing 支持将 Atlas 数据库collection用作死信队列(DLQ)。当 Atlas Stream Processing 无法处理数据流中的文档时,它会将文档内容以及处理失败的详细信息写入 DLQ。 您可以在流处理器定义中将collection指定为 DLQ。

要了解更多信息,请参阅创建流处理器。

Atlas Stream Processing支持在 AWS 和Azure上创建流处理实例。有关可用区域的列表,请参阅以下内容的流处理实例部分:

流处理器可以读取和写入托管在不同云提供商或不同区域的集群,前提是它们与托管流处理实例位于同一项目中。

有关 Atlas Stream Processing 核心概念的更多详细信息,请阅读以下内容:

后退

Atlas Stream Processing