$lookup
定义
$lookup执行从 $source
到连接注册表中的 Atlas 集合的消息流的左外连接。
根据您的使用案例, $lookup
管道阶段使用以下三种语法之一:
要了解更多信息,请参阅$lookup 语法。
警告
使用 $lookup
扩充流可能会降低流处理速度。
以下原型表单展示了所有可用字段:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
语法
$lookup
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
from | 文档 | 必需 | 用于指定 Atlas 数据库中要加入到来自 $source 的消息的集合的文档。您必须指定连接注册表中的集合。您必须为此文档中的所有字段指定一个值。 |
from.connectionName | 字符串 | 必需 | 连接注册表中的连接名称。 |
from.db | 字符串 | 必需 | 包含您要加入的集合的 Atlas 数据库名称。 |
from.coll | 字符串 | 必需 | 您想要加入的集合的名称。 |
localField | 字符串 | 可选的 | 要联接的 该字段是以下语法的一部分: |
foreignField | 字符串 | 可选的 | 要联接的 该字段是以下语法的一部分: |
let | 文档 | 可选的 | 该字段是以下语法的一部分: |
管道 | 文档 | 可选的 | 指定要在已联接集合上运行的 该字段是以下语法的一部分: |
作为 | 字符串 | 必需 | 要添加到输入文档中的新数组字段的名称。这个新数组字段包含 from 集合中的匹配文档。如果指定名称已作为字段存在于输入文档中,则该字段将被覆盖。 |
行为
$lookup的 Atlas Stream Processing 版本对来自$source
的消息和指定 Atlas 集合中的文档执行左外连接。此版本的行为与标准 MongoDB 数据库中提供的$lookup
阶段类似。但是,此版本要求您指定连接注册表中的 Atlas 集合作为from
字段的值。
管道可以包含嵌套的 $lookup
阶段。如果在管道中包含嵌套的 $lookup
阶段,则必须使用标准 from
语法,在与外部 $lookup
阶段相同的远程 Atlas 连接中指定一个集合。
例子
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
如果您的管道在同一集合上同时具有$lookup
和$merge
,则当您尝试维护增量视图时,Atlas Stream Processing 结果可能会有所不同。 Atlas Stream Processing 同时处理多个源消息,然后将它们全部合并在一起。如果多条消息具有相同的 ID( $lookup
和$merge
都使用该 ID,则 Atlas Stream Processing 可能会返回尚未实现的结果)。
例子
考虑以下输入流:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
假设查询包含管道内的以下内容:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
如果尝试保持增量视图,则可能会看到类似于以下内容的结果:
{ _id: 1, count: 5 }
但是,Atlas Stream Processing 可能会返回 5
或 3
的计数,具体取决于 Atlas Stream Processing 是否处理了文档。
有关更多信息,请参阅 $lookup
。