Docs 菜单
Docs 主页
/
MongoDB Atlas
/ /

$lookup

在此页面上

  • 定义
  • 语法
  • 行为
  • 示例

$lookup 阶段将来自 $source 的消息流与连接注册表中的 Atlas 集合进行左外连接。

根据您的使用案例,$lookup 管道阶段将使用以下三种语法之一:

要了解更多信息,请参阅$lookup 语法。

警告

使用 $lookup 扩充流可能会降低流处理速度。

以下原型表单展示了所有可用字段:

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>"
}
}

$lookup 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

from

文档

可选的

用于指定Atlas数据库中要加入到来自$source 的消息的集合的文档。您必须仅指定连接注册表中的集合。

如果指定此字段,则必须指定此文档中所有字段的值。

如果您指定 pipeline字段。

from.connectionName

字符串

可选的

连接注册表中的连接名称。

如果您指定 pipeline字段。

from.db

字符串

可选的

包含您要加入的集合的 Atlas 数据库名称。

如果您指定 pipeline字段。

from.coll

字符串

可选的

您想要加入的集合的名称。

如果您指定 pipeline字段。

localField

字符串

可选的

要联接的 $source 信息中的字段。

该字段是以下语法的一部分:

foreignField

字符串

可选的

要联接的 from 集合中文档的字段。

该字段是以下语法的一部分:

let

文档

可选的

指定各个管道阶段中使用的变量。要了解详情,请参阅 let。

该字段是以下语法的一部分:

管道

文档

可选的

指定要在已联接集合上运行的 pipeline。要了解更多信息,请参阅管道

该字段是以下语法的一部分:

作为

字符串

必需

要添加到输入文档中的新数组字段的名称。这个新数组字段包含 from 集合中的匹配文档。如果指定名称已作为字段存在于输入文档中,则该字段将被覆盖。

Atlas Stream Processing 版本的 $lookup 对来自 $source 的消息和指定 Atlas 集合中的文档执行左外连接。此版本的行为类似于标准 MongoDB 数据库中提供的 $lookup 阶段。但是,此版本要求您将连接注册表中的 Atlas 集合指定为 from 字段的值。

管道可以包含嵌套的 $lookup 阶段。如果在管道中包含嵌套的 $lookup 阶段,则必须使用标准 from 语法,在与外部 $lookup 阶段相同的远程 Atlas 连接中指定一个集合。

例子

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
,
pipeline: [
,
{
$lookup: {
from: "coll2",
,
}
},
,
]
}

如果管道在同一集合上同时具有 $lookup$merge,而您尝试维护增量视图,Atlas Stream Processing 结果可能会有所不同。Atlas Stream Processing 同时处理多个源消息,然后将它们合并在一起。如果多个消息具有 $lookup$merge 都使用的相同 ID,则 Atlas Stream Processing 可能会返回尚未物化的结果。

例子

考虑以下输入流:

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

假设查询包含管道内的以下内容:

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

如果尝试保持增量视图,则可能会看到类似于以下内容的结果:

{ _id: 1, count: 5 }

但是,Atlas Stream Processing 可能会返回 53 的计数,具体取决于 Atlas Stream Processing 是否处理了文档。

有关更多信息,请参阅 $lookup

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。名为 humidity_descriptions 的集合包含以下形式的文档:

其中,relative_humidity 字段描述了室温( 20 摄氏度)下的相对湿度, condition列出了适合该湿度水平的语言描述符。您可以使用 $lookup 阶段,用推荐的描述符来丰富流媒体天气报告,方便气象学家在天气预报广播中使用。

以下聚合有四个阶段:

  1. $source 阶段与在名为 my_weatherdata 的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $lookup 阶段将 humidity_descriptions 数据库中的记录合并到 dewPoint 字段的天气报告中。

  3. $match 阶段会排除 humidity_info 字段为空的文档,并将 humidity_info 字段已填充的文档传递到下一阶段。

  4. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 enriched_stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

要查看生成的 sample_weatherstream.enriched_stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: 2055
}
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$https