Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/ /

$lookup

Nesta página

  • Definição
  • Sintaxe
  • Comportamento
  • Exemplos

A fase $lookup executa uma junção externa esquerda do fluxo de mensagens da $source para uma coleção do Atlas no registro de conexão.

Dependendo do seu caso de uso, um estágio de pipeline $lookup usa uma das três sintaxes a seguir:

Para saber mais, consulte Sintaxe $lookup.

Aviso

Usar $lookup para enriquecimento de um fluxo pode reduzir a velocidade de processamento do fluxo.

O seguinte formulário protótipo ilustra todos os campos disponíveis:

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>"
}
}

O estágio $lookup recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

from

documento

Condicional

Documento que especifica uma collection em um banco de banco de dados Atlas para unir às mensagens do seu . Você deve especificar uma collection somente a partir do seu Registro de $source Conexão.

Se você especificar este campo, você deverá especificar valores para todos os campos neste documento.

Este campo não é exigido se você especificar um campo pipeline.

from.connectionName

string

Condicional

Nome da conexão no registro de conexões.

Este campo não é exigido se você especificar um campo pipeline.

from.db

string

Condicional

Nome do banco de dados do Atlas que contém a coleção que você deseja unir.

Este campo não é exigido se você especificar um campo pipeline.

from.coll

string

Condicional

Nome da coleção da qual você deseja participar.

Este campo não é exigido se você especificar um campo pipeline.

localField

string

Condicional

Campo a partir de suas mensagens $source do qual participar.

Este campo faz parte das seguintes sintaxes:

foreignField

string

Condicional

Campo de documentos na coleção from para participar.

Este campo faz parte das seguintes sintaxes:

let

documento

Condicional

Especifica as variáveis a serem usadas nos estágios do pipeline. Para saber mais, consulte let.

Este campo faz parte das seguintes sintaxes:

gasoduto

documento

Condicional

Especifica o pipeline a ser executado na coleção associada. Para saber mais, consulte pipeline.

Este campo faz parte das seguintes sintaxes:

como

string

Obrigatório

Nome do novo campo de array a ser adicionado aos documentos de entrada. Este novo campo de array contém os documentos correspondentes da coleção from. Se o nome especificado já existir como campo no documento de entrada, esse campo será substituído.

A versão do Atlas Stream Processing de $lookup executa um left join externo de mensagens do seu $source e os documentos em uma coleção especificada do Atlas. Essa versão se comporta de forma semelhante ao estágio $lookup disponível em um banco de dados MongoDB padrão. No entanto, esta versão exige que você especifique uma coleção Atlas do seu Registro de Conexão como o valor para o campo from.

O pipeline pode conter uma fase $lookup aninhada. Se você incluir uma fase $lookup aninhada em seu pipeline, utilize a sintaxe from padrão para especificar uma coleção na mesma conexão remota do Atlas que a fase $lookup externa.

Exemplo

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
,
pipeline: [
,
{
$lookup: {
from: "coll2",
,
}
},
,
]
}

Se o seu pipeline tem $lookup e $merge na mesma coleção, os resultados do Atlas Stream Processing podem variar se você tentar manter uma visualização incremental. O Atlas Stream Processing processa várias mensagens de origem simultaneamente e, em seguida, mescla todas elas. Se várias mensagens tiverem o mesmo ID, que tanto $lookup quanto $merge usam, o Atlas Stream Processing poderá retornar resultados que ainda não foram materializados.

Exemplo

Considere o seguinte fluxo de entrada:

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

Suponha que sua query contenha o seguinte dentro do pipeline:

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

Se estiver tentando manter uma visualização gradual, o resultado poderá ser semelhante ao seguinte:

{ _id: 1, count: 5 }

No entanto, o Atlas Stream Processing pode retornar uma contagem de 5 ou 3, dependendo se o Atlas Stream Processing processou os documentos.

Para obter mais informações, consulte $lookup.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. Uma coleção chamada humidity_descriptions contém documentos do formato:

Onde o campo relative_humidity descreve a umidade relativa em temperatura ambiente (20 graus Celsius), e condition lista descritores verbais adequados para esse nível de umidade. Você pode usar a fase $lookup para enriquecer os relatórios meteorológicos em tempo real com descritores sugeridos para meteorologistas usarem nas transmissões meteorológicas.

A seguinte agregação tem quatro fases:

  1. O estágio estabelece $source uma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $lookup junta os registros do banco de dados humidity_descriptions aos relatórios meteorológicos no campo dewPoint.

  3. A fase $match exclui documentos que têm um campo humidity_info vazio e passa documentos com um campo humidity_info preenchido para a próxima fase.

  4. O estágio $merge grava a saída na coleção do Atlas chamada enriched_stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.enriched_stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: 2055
}
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$https