Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$merge

項目一覧

  • 定義
  • 構文
  • 動作

$mergeステージは、メッセージを書き込む接続レジストリで接続を指定します。 接続は Atlas 接続である必要があります。

$mergeパイプライン ステージには次のプロトタイプ形式があります。

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge",
"whenNotMatched": "insert | discard"
}
}

$mergeの Atlas Stream Processing バージョンは、Atlas Data Federation バージョンと同じフィールドのほとんどを使用します。 ただし、Atlas Stream Processing は Atlas 接続へのマージのみをサポートするため、 intoの構文は簡素化されます。 詳細については、Atlas Data Federation $mergeフィールドの説明を参照してください。

$merge は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$mergeステージは 1 つだけです。

onフィールドには、シャーディングされたコレクションに対する$mergeに対する特別な要件があります。 詳細については、「 $merge 構文 」を参照してください。

次のフィールドの値として動的式を使用できます。

  • into.db

  • into.coll

これにより、ストリーム プロセッサは、メッセージごとに異なるターゲット Atlas コレクションにメッセージを書き込むことができます。

次の形式のメッセージを生成するトランザクション イベントのストリームがあります。

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

これらをそれぞれ個別の Atlas データベースとコレクションに並べ替えるには、次の$mergeステージを記述します。

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}

この$mergeステージ:

  • Very Important IndustriesメッセージをVIP.subscriptionという名前の Atlas コレクションに書き込みます。

  • N. E. Buddyメッセージをemployee.requisitionという名前の Atlas コレクションに書き込みます。

  • Khan Traktorメッセージをcontractor.billableHoursという名前の Atlas コレクションに書き込みます。

動的式は string として評価されるもののみを使用できます。 動的式の詳細については、「式演算子 」を参照してください。

動的式でデータベースまたはコレクションを指定しても、Atlas Stream Processing が特定のメッセージの式を評価できない場合、Atlas Stream Processing はそのメッセージをデッド レター キューに送信し、後続のメッセージを処理します。 デッドレターキューが設定されていない場合、Atlas Stream Processing はメッセージを完全にスキップし、後続のメッセージを処理します。

戻る

$emit

項目一覧