$lookup
Nesta página
Definição
A fase $lookup executa uma junção externa esquerda do fluxo de mensagens da $source
para uma coleção do Atlas no registro de conexão.
Dependendo do seu caso de uso, um estágio de pipeline $lookup
usa uma das três sintaxes a seguir:
Para saber mais, consulte Sintaxe $lookup.
Aviso
Usar $lookup
para enriquecimento de um fluxo pode reduzir a velocidade de processamento do fluxo.
O seguinte formulário protótipo ilustra todos os campos disponíveis:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
Sintaxe
O estágio $lookup
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
from | documento | Condicional | Documento que especifica uma collection em um banco de dados Atlas para unir às mensagens do seu Se você especificar este campo, você deverá especificar valores para todos os campos neste documento. Este campo não é exigido se você especificar um campo |
from.connectionName | string | Condicional | Nome da conexão no registro de conexões. Este campo não é exigido se você especificar um campo |
from.db | string | Condicional | Nome do banco de dados do Atlas que contém a coleção que você deseja unir. Este campo não é exigido se você especificar um campo |
from.coll | string | Condicional | Nome da coleção da qual você deseja participar. Este campo não é exigido se você especificar um campo |
localField | string | Condicional | Campo a partir de suas mensagens Este campo faz parte das seguintes sintaxes: |
foreignField | string | Condicional | Campo de documentos na coleção Este campo faz parte das seguintes sintaxes: |
let | documento | Condicional | |
gasoduto | documento | Condicional | Especifica o Este campo faz parte das seguintes sintaxes: |
como | string | Obrigatório | Nome do novo campo de array a ser adicionado aos documentos de entrada. Este novo campo de array contém os documentos correspondentes da coleção |
Comportamento
A versão do Atlas Stream Processing de $lookup executa um left join externo de mensagens do seu $source
e os documentos em uma coleção especificada do Atlas. Essa versão se comporta de forma semelhante ao estágio $lookup
disponível em um banco de dados MongoDB padrão. No entanto, esta versão exige que você especifique uma coleção Atlas do seu Registro de Conexão como o valor para o campo from
.
O pipeline pode conter uma fase $lookup
aninhada. Se você incluir uma fase $lookup
aninhada em seu pipeline, utilize a sintaxe from
padrão para especificar uma coleção na mesma conexão remota do Atlas que a fase $lookup
externa.
Exemplo
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
Se o seu pipeline tem $lookup
e $merge
na mesma coleção, os resultados do Atlas Stream Processing podem variar se você tentar manter uma visualização incremental. O Atlas Stream Processing processa várias mensagens de origem simultaneamente e, em seguida, mescla todas elas. Se várias mensagens tiverem o mesmo ID, que tanto $lookup
quanto $merge
usam, o Atlas Stream Processing poderá retornar resultados que ainda não foram materializados.
Exemplo
Considere o seguinte fluxo de entrada:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
Suponha que sua query contenha o seguinte dentro do pipeline:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
Se estiver tentando manter uma visualização gradual, o resultado poderá ser semelhante ao seguinte:
{ _id: 1, count: 5 }
No entanto, o Atlas Stream Processing pode retornar uma contagem de 5
ou 3
, dependendo se o Atlas Stream Processing processou os documentos.
Para obter mais informações, consulte $lookup
.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. Uma coleção chamada humidity_descriptions
contém documentos do formato:
Onde o campo relative_humidity
descreve a umidade relativa em temperatura ambiente (20 graus Celsius), e condition
lista descritores verbais adequados para esse nível de umidade. Você pode usar a fase $lookup para enriquecer os relatórios meteorológicos em tempo real com descritores sugeridos para meteorologistas usarem nas transmissões meteorológicas.
A seguinte agregação tem quatro fases:
O estágio estabelece
$source
uma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$lookup
junta os registros do banco de dadoshumidity_descriptions
aos relatórios meteorológicos no campodewPoint
.A fase
$match
exclui documentos que têm um campohumidity_info
vazio e passa documentos com um campohumidity_info
preenchido para a próxima fase.O estágio
$merge
grava a saída na coleção do Atlas chamadaenriched_stream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.enriched_stream
resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: 2055 } } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.