$merge
定義
$mergeステージは、メッセージを書き込む接続レジストリで接続を指定します。 接続は Atlas 接続である必要があります。
$merge
パイプライン ステージには次のプロトタイプ形式があります。
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
構文
$mergeの Atlas Stream Processing バージョンは、Atlas Data Federation バージョンと同じフィールドのほとんどを使用します。 ただし、Atlas Stream Processing は Atlas 接続へのマージのみをサポートするため、 into
の構文は簡素化されます。 詳細については、Atlas Data Federation $merge
フィールドの説明を参照してください。
動作
$merge
は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$merge
ステージは 1 つだけです。
on
フィールドには、シャーディングされたコレクションに対する$merge
に対する特別な要件があります。 詳細については、「 $merge 構文 」を参照してください。
次のフィールドの値として動的式を使用できます。
into.db
into.coll
これにより、ストリーム プロセッサは、メッセージごとに異なるターゲット Atlas コレクションにメッセージを書き込むことができます。
例
次の形式のメッセージを生成するトランザクション イベントのストリームがあります。
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
これらをそれぞれ個別の Atlas データベースとコレクションに並べ替えるには、次の$merge
ステージを記述します。
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
この$merge
ステージ:
Very Important Industries
メッセージをVIP.subscription
という名前の Atlas コレクションに書き込みます。N. E. Buddy
メッセージをemployee.requisition
という名前の Atlas コレクションに書き込みます。Khan Traktor
メッセージをcontractor.billableHours
という名前の Atlas コレクションに書き込みます。
動的式は string として評価されるもののみを使用できます。 動的式の詳細については、「式演算子 」を参照してください。
動的式でデータベースまたはコレクションを指定しても、Atlas Stream Processing が特定のメッセージの式を評価できない場合、Atlas Stream Processing はそのメッセージをデッド レター キューに送信し、後続のメッセージを処理します。 デッドレターキューが設定されていない場合、Atlas Stream Processing はメッセージを完全にスキップし、後続のメッセージを処理します。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTime
に設定されます。$match
ステージでは、dewPoint.value
5.0
が 以下のドキュメントを除外し、 がdewPoint.value
5.0
より大きいドキュメントを次のステージに渡します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。