$lookup
定义
$lookup 阶段将来自 $source
的消息流与连接注册表中的 Atlas 集合进行左外连接。
根据您的使用案例,$lookup
管道阶段将使用以下三种语法之一:
要了解更多信息,请参阅$lookup 语法。
警告
使用 $lookup
扩充流可能会降低流处理速度。
以下原型表单展示了所有可用字段:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
语法
$lookup
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
from | 文档 | 可选的 | |
from.connectionName | 字符串 | 可选的 | 连接注册表中的连接名称。 如果您指定 |
from.db | 字符串 | 可选的 | 包含您要加入的集合的 Atlas 数据库名称。 如果您指定 |
from.coll | 字符串 | 可选的 | 您想要加入的集合的名称。 如果您指定 |
localField | 字符串 | 可选的 | |
foreignField | 字符串 | 可选的 | |
let | 文档 | 可选的 | |
管道 | 文档 | 可选的 | |
作为 | 字符串 | 必需 | 要添加到输入文档中的新数组字段的名称。这个新数组字段包含 |
行为
Atlas Stream Processing 版本的 $lookup 对来自 $source
的消息和指定 Atlas 集合中的文档执行左外连接。此版本的行为类似于标准 MongoDB 数据库中提供的 $lookup
阶段。但是,此版本要求您将连接注册表中的 Atlas 集合指定为 from
字段的值。
管道可以包含嵌套的 $lookup
阶段。如果在管道中包含嵌套的 $lookup
阶段,则必须使用标准 from
语法,在与外部 $lookup
阶段相同的远程 Atlas 连接中指定一个集合。
例子
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
如果管道在同一集合上同时具有 $lookup
和 $merge
,而您尝试维护增量视图,Atlas Stream Processing 结果可能会有所不同。Atlas Stream Processing 同时处理多个源消息,然后将它们合并在一起。如果多个消息具有 $lookup
和 $merge
都使用的相同 ID,则 Atlas Stream Processing 可能会返回尚未物化的结果。
例子
考虑以下输入流:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
假设查询包含管道内的以下内容:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
如果尝试保持增量视图,则可能会看到类似于以下内容的结果:
{ _id: 1, count: 5 }
但是,Atlas Stream Processing 可能会返回 5
或 3
的计数,具体取决于 Atlas Stream Processing 是否处理了文档。
有关更多信息,请参阅 $lookup
。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。名为 humidity_descriptions
的集合包含以下形式的文档:
其中,relative_humidity
字段描述了室温( 20 摄氏度)下的相对湿度, condition
列出了适合该湿度水平的语言描述符。您可以使用 $lookup 阶段,用推荐的描述符来丰富流媒体天气报告,方便气象学家在天气预报广播中使用。
以下聚合有四个阶段:
$source
阶段与在名为my_weatherdata
的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime
。$lookup
阶段将humidity_descriptions
数据库中的记录合并到dewPoint
字段的天气报告中。$match
阶段会排除humidity_info
字段为空的文档,并将humidity_info
字段已填充的文档传递到下一阶段。$merge
阶段将输出写入sample_weatherstream
数据库中名为enriched_stream
的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
要查看生成的 sample_weatherstream.enriched_stream
集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: 2055 } } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。