$lookup
定義
$lookupステージは、 $source
からのメッセージストリームを接続レジストリの Atlas コレクションに左外部結合を実行します。
ユースケースに応じて、 $lookup
パイプライン ステージは次の 3 つの構文のいずれかを使用します。
詳しくは、「 $lookup 構文 」を参照してください。
警告
$lookup
を使用してストリームを強化することで、ストリーム処理速度が低下する可能性があります。
次のプロトタイプ形式は、利用可能なすべてのフィールドを示しています。
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
構文
$lookup
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
from | ドキュメント | 条件付き |
このフィールドを指定する場合は、 このドキュメント内のすべてのフィールドに値を指定する必要があります。 このフィールドは |
from. connectionName | string | 条件付き | 接続レジストリ内の接続の名前。 このフィールドは |
from.db | string | 条件付き | 参加するコレクションを含む Atlas database の名前。 このフィールドは |
from.coll | string | 条件付き | 参加するコレクションの名前。 このフィールドは |
localField | string | 条件付き | |
foreignField | string | 条件付き | |
let | ドキュメント | 条件付き | パイプラインステージで使用する変数を指定します。 詳しくは、 let を参照してください。 このフィールドは、次の構文の一部です。 |
パイプライン | ドキュメント | 条件付き | 結合済みコレクションで実行する このフィールドは、次の構文の一部です。 |
as | string | 必須 | 入力ドキュメントに追加する新しい配列フィールドの名前。 この新しい配列フィールドには、 |
動作
Atlas Stream Processing バージョンの$lookupは、 $source
からのメッセージと指定された Atlas コレクション内のドキュメントの左外部結合を実行します。 このバージョンは、標準の MongoDB database で使用可能な$lookup
ステージと同様に動作します。 ただし、このバージョンでは、 from
フィールドの値として接続レジストリの Atlas コレクションを指定する必要があります。
パイプラインにはネストされた$lookup
ステージを含めることができます。 パイプラインにネストされた$lookup
ステージを含める場合は、標準のfrom
構文を使用して、外側の$lookup
ステージと同じリモート Atlas 接続でコレクションを指定する必要があります。
例
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
パイプラインで同じコレクションに$lookup
と$merge
の両方がある場合、インクリメンタル ビューを維持しようとすると、Atlas Stream Processing の結果が異なる場合があります。 Atlas Stream Processing は複数のソース メッセージを同時に処理し、それらをすべてマージします。 複数のメッセージが同じ ID を持ち、 $lookup
と$merge
の両方が使用する場合、Atlas Stream Processing ではまだマテリアライズドされていない結果が返されることがあります。
例
次の入力ストリームを考えてみましょう。
{ _id: 1, count: 2 } { _id: 1, count: 3 }
パイプライン内でクエリに次の内容が含まれているとします。
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
インクリメンタル ビューを維持しようとすると、次のような結果が予想されます。
{ _id: 1, count: 5 }
ただし、 Atlas Stream Processing Atlas Stream Processingドキュメントを処理したかどうかに応じて、5
または 3
のカウントが返される場合があります。
詳細については、 $lookup
を参照してください。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 humidity_descriptions
という名前のコレクションには次の形式のドキュメントが含まれています。
relative_humidity
フィールドは温度( 20摂氏)の相対温度を表し、 condition
はその温度のレベルに適した言語記述子をリストします。 $lookupステージを使用して、気象配信で使用する推奨される記述子を使用してストリーミング気象レポートを強化できます。
次の集計には 4 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTime
に設定されます。$lookup
ステージは、humidity_descriptions
データベースのレコードをdewPoint
フィールドの気象レポートに結合します。$match
ステージでは、humidity_info
フィールドが空のドキュメントは除外され、humidity_info
フィールドに入力されたドキュメントは次のステージに渡されます。$merge
ステージは、sample_weatherstream
データベース内のenriched_stream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
結果のsample_weatherstream.enriched_stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: 2055 } } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。