Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$lookup

이 페이지의 내용

  • 정의
  • 구문
  • 행동
  • 예시

$lookup 단계는 $source 메시지 스트림의 연결 레지스트리 Atlas 컬렉션에 대한왼쪽 외부 조인을 수행합니다.

사용 사례에 따라 $lookup 파이프라인 단계는 다음 세 가지 구문 중 하나를 사용합니다.

자세히 알아보려면 $lookup 구문을 참조하세요.

경고

$lookup을 사용하여 스트림을 보강하면 스트림 처리 속도가 느려질 수 있습니다.

다음 프로토타입 양식은 사용 가능한 모든 필드를 보여 줍니다.

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>"
}
}

$lookup 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
FROM
문서
조건부

의 메시지에 결합할 Atlas 데이터베이스 의 컬렉션 을 지정하는 문서입니다. 연결 레지스트리에서만 컬렉션 을 지정해야 합니다.$source

이 필드 를 지정하는 경우 이 문서 의 모든 필드에 대한 값을 지정해야 합니다.

pipeline 필드 를 지정하는 경우 이 필드 는 필요하지 않습니다.

from.connectionName
문자열
조건부

연결 레지스트리의 연결 이름입니다.

pipeline 필드 를 지정하는 경우 이 필드 는 필요하지 않습니다.

from.db
문자열
조건부

참여하려는 컬렉션이 포함된 Atlas 데이터베이스의 이름입니다.

pipeline 필드 를 지정하는 경우 이 필드 는 필요하지 않습니다.

from.coll
문자열
조건부

가입하려는 컬렉션의 이름입니다.

pipeline 필드 를 지정하는 경우 이 필드 는 필요하지 않습니다.

localField
문자열
조건부

참여할 $source 메시지의 입력란입니다.

이 필드는 다음 구문의 일부입니다.

foreignField
문자열
조건부

조인할 from 컬렉션에 있는 문서의 필드입니다.

이 필드는 다음 구문의 일부입니다.

하자
문서
조건부

파이프라인 단계에서 사용할 변수를 지정합니다. 자세히 알아보려면 let을 참조하세요.

이 필드는 다음 구문의 일부입니다.

파이프라인
문서
조건부

조인된 컬렉션에서 실행할 pipeline을 지정합니다. 자세한 내용은 파이프라인을 참조하세요.

이 필드는 다음 구문의 일부입니다.

방식
문자열
필수 사항
입력 문서에 추가할 새 배열 필드의 이름입니다. 새 배열 필드에는 from 컬렉션에서 매칭되는 문서가 포함됩니다. 지정한 이름이 입력 문서에 이미 필드로 존재하는 경우 기존 필드를 덮어씁니다.

$lookup의 Atlas Stream Processing 버전은 $source의 메시지와 지정된 Atlas 컬렉션의 문서에 대한 왼쪽 외부 조인을 수행합니다. 이 버전은 표준 MongoDB 데이터베이스에서 사용 가능한 $lookup 단계와 유사하게 작동합니다. 하지만 이 버전에서는 from 필드의 값으로 연결 레지스트리에서 Atlas 컬렉션을 지정해야 합니다.

파이프라인에는 중첩된 $lookup 단계가 포함될 수 있습니다. 파이프라인에 중첩된 $lookup 단계를 포함하는 경우, 표준 from 구문을 사용하여 외부 $lookup 단계와 동일한 원격 Atlas 연결에 컬렉션을 지정해야 합니다.

예시

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
,
pipeline: [
,
{
$lookup: {
from: "coll2",
,
}
},
,
]
}

파이프라인의 동일한 컬렉션에 $lookup$merge가 있는 경우 구체화된 뷰를 유지하려고 하면 Atlas Stream Processing 결과가 다를 수 있습니다. Atlas Stream Processing은 여러 소스 메시지를 동시에 처리한 후 이를 모두 병합합니다. $lookup$merge를 모두 사용하는 동일한 ID 를 가진 메시지가 여러 개 있는 경우 Atlas Stream Processing에서 아직 구체화되지 않은 결과를 반환할 수 있습니다.

예시

다음 입력 스트림을 고려하세요.

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

파이프라인 내부에 다음과 같은 쿼리가 포함되어 있다고 가정해 보겠습니다.

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

증분 보기를 유지하려는 경우 다음과 유사한 결과를 예상할 수 있습니다.

{ _id: 1, count: 5 }

하지만 Atlas Stream Processing은 Atlas Stream Processing이 문서를 처리했는지 여부에 따라 5 또는 3을 반환할 수 있습니다.

자세한 내용은 $lookup을 참조하세요.

스트리밍 데이터 소스는 샘플 날씨 데이터셋의 스키마에 따라 다양한 위치에서 상세한 날씨 보고서를 생성합니다. humidity_descriptions 라는 이름의 컬렉션에는 다음과 같은 형식의 문서가 포함되어 있습니다.

여기서 relative_humidity 필드는 실온(섭씨 20도)에서의 상대 습도를 설명하고, condition 필드에는 해당 습도 수준에 적합한 구두 설명이 나열되어 있습니다. $lookup 단계를 사용하면 기상학자가 날씨 방송에 사용할 수 있는 설명자를 제안하여 스트리밍 날씨 보고서를 풍부하게 만드는 데 도움이 됩니다.

다음 집계에는 4단계가 있습니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여 ingestionTime으로 설정합니다.

  2. $lookup 단계에서는 humidity_descriptions 데이터베이스의 기록을 dewPoint 필드의 일기 예보에 결합합니다.

  3. $match 단계에서는 humidity_info 필드가 비어 있는 문서를 제외하고, humidity_info 필드가 채워진 문서를 다음 단계로 전달합니다.

  4. $merge 단계는 sample_weatherstream 데이터베이스의 enriched_stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

결과 sample_weatherstream.enriched_stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: 2055
}
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

$validate

이 페이지의 내용