Docs 菜单
Docs 主页
/
MongoDB Atlas
/

开始使用 Atlas Stream Processing

在此页面上

  • 先决条件
  • 步骤
  • 在Atlas中, Go项目的 Stream Processing 页面。
  • 创建Atlas Stream Processing实例。
  • 获取Atlas Stream Processing实例连接string 。
  • 将 MongoDB Atlas 连接添加到连接注册表。
  • 验证您的流数据源是否发出消息。
  • 创建持久流处理器。
  • 启动流处理器。
  • 验证流处理器的输出。
  • 删除流处理器。
  • 后续步骤

本教程将引导您完成设置 Atlas Stream Processing 和运行第一个流处理器的步骤。

要完成本教程,您需要:

  • Atlas 项目

  • mongosh 2.0或更高版本

  • 具有 Project OwnerProject Stream Processing Owner角色的 Atlas 用户,用于管理流处理实例和连接注册表

    注意

    Project Owner角色允许您创建数据库部署、管理项目访问和项目设置、管理 IP 访问列表条目等。

    Project Stream Processing Owner角色支持 Atlas Stream Processing 操作,例如查看、创建、删除和编辑流处理实例,以及查看、添加、修改和删除连接注册表中的连接。

    请参阅项目角色,详细了解这两个角色之间的区别。

  • 具有atlasAdmin角色的数据库用户,用于创建和运行流处理器

  • Atlas 集群

1
  1. 如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。

  2. 如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。

  3. 在侧边栏中,单击Services标题下的Stream Processing

    此时会显示“流处理”页面。

2
  1. 点击右下角的Get Started 。 Atlas 简要说明了 Atlas Stream Processing 的核心组件。

  2. 单击 Create instance 按钮。

  3. Create a stream processing instance页面上,按如下方式配置实例:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. 单击 Create(连接)。

3
  1. 找到Atlas Stream Processing实例的概述面板,然后单击 Connect

  2. 选择 I have the MongoDB shell installed

  3. Select your mongo shell version下拉菜单中,选择最新版本的mongosh

  4. 复制 Run your connection string in your command line 下提供的连接string 。 您将在后续步骤中用到它。

  5. 单击 Close(连接)。

4

此连接用作我们的流数据接收器。

  1. 在Atlas Stream Processing实例的窗格中,单击 Configure

  2. Connection Registry标签页中,单击右上角的+ Add Connection

  3. 单击Atlas Database 。 在Connection Name字段中,输入mongodb1 。 从Atlas Cluster下拉列表中,选择一个未存储任何数据的 Atlas 集群。

  4. 单击 Add connection(连接)。

5

您的流处理实例预先配置了与名为sample_stream_solar的示例数据源的连接。该源生成来自各种太阳能设备的一系列报告。每份报告都描述了在特定时间点观察到的单个太阳能设备的瓦数和温度,以及该设备的最大瓦数。

以下文档是一个代表性示例。

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
},
_ts: ISODate('2024-08-12T21:41:01.788Z'),
_stream_meta: {
source: {
type: 'generated'
}
}
}

要验证此源是否发出消息,请以交互方式创建流处理器。

  1. 打开您选择的终端应用程序。

  2. 使用 连接到Atlas Stream Processing mongosh实例。

    将您在上一步中复制的 连接mongosh 粘贴到终端,其中string<atlas-stream-processing-url> URL是Atlas Stream Processing 实例的 ,<username> 是具有atlasAdmin 角色的用户。

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    根据提示输入密码。

  3. 创建流处理器。

    将以下代码复制到mongosh提示符中:

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    验证来自sample_stream_solar连接的数据是否显示在控制台上,并终止该进程。

    使用sp.process()创建的流处理器在终止后不会持续存在。

6

使用聚合管道,您可以在摄取每个文档时对其进行转换。以下聚合管道以一秒为间隔导出每个太阳能设备的最高温度以及平均值、中值、最大和最小瓦数。

  1. 配置$source阶段。

    以下$source阶段从sample_stream_solar源摄取数据,并将 Atlas Stream Processing 时间字段值设置为等于源的timestamp字段的值。

    let s = {
    $source: {
    connectionName: "sample_stream_solar",
    timeField: {
    $dateFromString: {
    dateString: '$timestamp'
    }
    }
    }
    }
  2. 配置$group阶段。

    以下$group阶段根据group_id组织所有传入数据,为每个group_id累加所有文档的obs.tempobs.watts字段的值,然后派生所需的数据。

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $avg: "$obs.temp"
    },
    avg_watts: {
    $min: "$obs.watts"
    },
    median_watts: {
    $min: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  3. 配置$tumblingWindow阶段。

    为了对流数据执行$group等累加操作,Atlas Stream Processing 使用窗口来限制数据集。以下$tumblingWindow阶段将流分成连续的10秒间隔。

    这意味着,举例来说,当$group阶段计算median_watts的值时,它会获取在前10秒内摄取的具有给定group_id的所有文档的obs.watts值。

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  4. 配置$merge阶段。

    $merge 允许您将处理后的流数据写入 Atlas 数据库。

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  5. 创建流处理器。

    为新的流处理器指定名称,并通过按顺序列出每个阶段来声明其聚合管道。 $group阶段属于$tumblingWindow的嵌套管道,您不得将其包含在处理器管道定义中。

    sp.createStreamProcessor("solarDemo", [s, t, m])

这将创建一个名为solarDemo的流处理器,它应用之前定义的查询并将处理后的数据写入您连接到的集群上的solarDb数据库的solarColl集合。它会返回从太阳能设备的10秒观测间隔中获得的各种测量值。

要详细了解 Atlas Stream Processing 如何写入静态数据库,请参阅$merge

7

mongosh中运行以下命令:

sp.solarDemo.start()
8

要验证处理器是否处于活动状态,请在mongosh中运行以下命令:

sp.solarDemo.stats()

此命令报告solarDemo流处理器的操作统计信息。

要验证流处理器是否正在将数据写入 Atlas 集群,请执行以下操作:

  1. 在 Atlas 中,进入项目的 Clusters 页面。

    1. 如果尚未显示,请选择包含所需项目的组织导航栏中的Organizations菜单。

    2. 如果尚未显示,请从导航栏的Projects菜单中选择所需的项目。

    3. 如果 Clusters(数据库部署)页面尚未出现,请单击侧边栏中的 Database(数据库)。

      此时会显示“集群”页面。

  2. 单击集群的对应 Browse Collections 按钮。

    显示数据浏览器

  3. 查看MySolar集合。

或者,您可以使用mongosh在终端中显示已处理文档的样本:

sp.solarDemo.sample()
{
_id: 10,
max_watts: 136,
min_watts: 130,
avg_watts: 133,
median_watts: 130,
max_temp: 7,
_stream_meta: {
source: {
type: 'generated'
},
window: {
start: ISODate('2024-08-12T22:49:05.000Z'),
end: ISODate('2024-08-12T22:49:10.000Z')
}
}
}

注意

以上是一个具有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

9

mongosh中运行以下命令:

sp.avgWatts.drop()

要确认已删除avgWatts ,请列出所有可用的流处理器:

sp.listStreamProcessors()

了解如何:

后退

Overview