Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$merge

項目一覧

  • 定義
  • 構文
  • 動作

$mergeステージは、メッセージを書き込む接続レジストリで接続を指定します。 接続は Atlas 接続である必要があります。

$mergeパイプライン ステージには次のプロトタイプ形式があります。

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge",
"whenNotMatched": "insert | discard"
}
}

$mergeの Atlas Stream Processing バージョンは、Atlas Data Federation バージョンと同じフィールドのほとんどを使用します。 ただし、Atlas Stream Processing は Atlas 接続へのマージのみをサポートするため、 intoの構文は簡素化されます。 詳細については、Atlas Data Federation $mergeフィールドの説明を参照してください。

$merge は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$mergeステージは 1 つだけです。

onフィールドには、シャーディングされたコレクションに対する$mergeに対する特別な要件があります。 詳細については、「 $merge 構文 」を参照してください。

次のフィールドの値として動的式を使用できます。

  • into.db

  • into.coll

これにより、ストリーム プロセッサは、メッセージごとに異なるターゲット Atlas コレクションにメッセージを書き込むことができます。

次の形式のメッセージを生成するトランザクション イベントのストリームがあります。

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

これらをそれぞれ個別の Atlas データベースとコレクションに並べ替えるには、次の$mergeステージを記述します。

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}

この$mergeステージ:

  • Very Important IndustriesメッセージをVIP.subscriptionという名前の Atlas コレクションに書き込みます。

  • N. E. Buddyメッセージをemployee.requisitionという名前の Atlas コレクションに書き込みます。

  • Khan Traktorメッセージをcontractor.billableHoursという名前の Atlas コレクションに書き込みます。

動的式は string として評価されるもののみを使用できます。 動的式の詳細については、「式演算子 」を参照してください。

動的式でデータベースまたはコレクションを指定しても、Atlas Stream Processing が特定のメッセージの式を評価できない場合、Atlas Stream Processing はそのメッセージをデッド レター キューに送信し、後続のメッセージを処理します。 デッドレターキューが設定されていない場合、Atlas Stream Processing はメッセージを完全にスキップし、後続のメッセージを処理します。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTimeに設定されます。

  2. $matchステージでは、dewPoint.value 5.0が 以下のドキュメントを除外し、 がdewPoint.value 5.0より大きいドキュメントを次のステージに渡します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$emit