$lookup
Nesta página
Definição
A fase $lookup executa uma junção externa esquerda do fluxo de mensagens da $source
para uma coleção do Atlas no registro de conexão.
Dependendo do seu caso de uso, um estágio de pipeline $lookup
usa uma das três sintaxes a seguir:
Para saber mais, consulte Sintaxe $lookup.
Aviso
Usar $lookup
para enriquecimento de um fluxo pode reduzir a velocidade de processamento do fluxo.
O seguinte formulário protótipo ilustra todos os campos disponíveis:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
Sintaxe
O estágio $lookup
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
from | documento | Condicional | Documento que especifica uma collection em um banco de banco de dados Atlas para unir às mensagens do seu . Você deve especificar uma collection somente a partir do seu Registro de Se você especificar este campo, você deverá especificar valores para todos os campos neste documento. Este campo não é exigido se você especificar um campo |
from.connectionName | string | Condicional | Nome da conexão no registro de conexões. Este campo não é exigido se você especificar um campo |
from.db | string | Condicional | Nome do banco de dados do Atlas que contém a coleção que você deseja unir. Este campo não é exigido se você especificar um campo |
from.coll | string | Condicional | Nome da coleção da qual você deseja participar. Este campo não é exigido se você especificar um campo |
localField | string | Condicional | Campo a partir de suas mensagens Este campo faz parte das seguintes sintaxes: |
foreignField | string | Condicional | Campo de documentos na coleção Este campo faz parte das seguintes sintaxes: |
let | documento | Condicional | |
gasoduto | documento | Condicional | Especifica o Este campo faz parte das seguintes sintaxes: |
como | string | Obrigatório | Nome do novo campo de array a ser adicionado aos documentos de entrada. Este novo campo de array contém os documentos correspondentes da coleção |
Comportamento
A versão do Atlas Stream Processing de $lookup executa um left join externo de mensagens do seu $source
e os documentos em uma coleção especificada do Atlas. Essa versão se comporta de forma semelhante ao estágio $lookup
disponível em um banco de dados MongoDB padrão. No entanto, esta versão exige que você especifique uma coleção Atlas do seu Registro de Conexão como o valor para o campo from
.
O pipeline pode conter uma fase $lookup
aninhada. Se você incluir uma fase $lookup
aninhada em seu pipeline, utilize a sintaxe from
padrão para especificar uma coleção na mesma conexão remota do Atlas que a fase $lookup
externa.
Exemplo
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
Se o seu pipeline tem $lookup
e $merge
na mesma coleção, os resultados do Atlas Stream Processing podem variar se você tentar manter uma visualização incremental. O Atlas Stream Processing processa várias mensagens de origem simultaneamente e, em seguida, mescla todas elas. Se várias mensagens tiverem o mesmo ID, que tanto $lookup
quanto $merge
usam, o Atlas Stream Processing poderá retornar resultados que ainda não foram materializados.
Exemplo
Considere o seguinte fluxo de entrada:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
Suponha que sua query contenha o seguinte dentro do pipeline:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
Se estiver tentando manter uma visualização gradual, o resultado poderá ser semelhante ao seguinte:
{ _id: 1, count: 5 }
No entanto, o Atlas Stream Processing pode retornar uma contagem de 5
ou 3
, dependendo se o Atlas Stream Processing processou os documentos.
Para obter mais informações, consulte $lookup
.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. Uma coleção chamada humidity_descriptions
contém documentos do formato:
Onde o campo relative_humidity
descreve a umidade relativa em temperatura ambiente (20 graus Celsius), e condition
lista descritores verbais adequados para esse nível de umidade. Você pode usar a fase $lookup para enriquecer os relatórios meteorológicos em tempo real com descritores sugeridos para meteorologistas usarem nas transmissões meteorológicas.
A seguinte agregação tem quatro fases:
O estágio estabelece
$source
uma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$lookup
junta os registros do banco de dadoshumidity_descriptions
aos relatórios meteorológicos no campodewPoint
.A fase
$match
exclui documentos que têm um campohumidity_info
vazio e passa documentos com um campohumidity_info
preenchido para a próxima fase.O estágio
$merge
grava a saída na coleção do Atlas chamadaenriched_stream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.enriched_stream
resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: 2055 } } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.