Docs Menu
Docs Home
/
MongoDBマニュアル
/ / /

$merge(集計)

項目一覧

  • 定義
  • 互換性
  • 構文
  • Considerations
  • 制限事項

注意

このページでは、集計パイプラインの結果をコレクションに出力する $merge ステージについて説明します。複数のドキュメントを 1 つのドキュメントにマージする $mergeObjects 演算子については、$mergeObjects を参照してください。

$merge

集計パイプラインの結果を指定されたコレクションに書き込みます。 $merge演算子はパイプラインの最後のステージ である必要があります。

$mergeステージ:

  • 同じデータベースまたは異なるデータベース内のコレクションに出力できます。

  • 集約されている同じコレクションに出力できます。 詳細については、 「 集計されているのと同じコレクションへの出力 」を参照してください。

  • 集計パイプライン$merge または $out ステージを使用する場合は、次の点を考慮してください。

    • MongoDB 5.0 以降では、クラスター内のすべてのノードの featureCompatibilityVersion5.0 以上に設定されており、読み込み設定(read preference) でセカンダリ読み取りが許可されている場合、$merge ステージのパイプラインをレプリカセットのセカンダリノードで実行できます。

      • $merge $out ステージはセカンダリ ノードで実行されますが、書込み (write) 操作はプライマリ ノードに送信されます。

      • すべてのドライバー バージョンがセカンダリ ノードに送信される $merge 操作をサポートしているわけではありません。詳細についてはドライバーのドキュメントを参照してください。

    • MongoDB の以前のバージョンでは、$out または $merge ステージを持つパイプラインは常にプライマリ ノードで実行され、読み込み設定 (read preference) は考慮されませんでした。

  • 出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。

  • 結果(新しいドキュメントの挿入、ドキュメントのマージ、ドキュメントの置換、既存のドキュメントの保持、操作の失敗、カスタムアップデートパイプラインによるドキュメントの処理)を既存のコレクションに組み込むことができます。

  • シャーディングされたコレクションに出力できます。入力コレクションもシャーディング可能です。

集計結果もコレクションに出力する$outステージとの比較については、「 $merge$outの比較 」を参照してください。

注意

オンデマンドのマテリアライズドビュー

$merge は、コレクションの完全な置換を行うのではなく、パイプラインの結果を既存の出力コレクションに組み込むことができます。この機能により、ユーザーはオンデマンドのマテリアライズドビューを作成できます。ここでは、パイプラインの実行時に出力コレクションの内容が段階的に更新されます。

このユースケースの詳細については、このページの例だけでなく、オンデマンド マテリアライズドビューも参照してください。

マテリアライズドビューは読み取り専用のビューとは別物です。読み取り専用ビューの作成については、「読み取り専用ビュー」を参照してください。

次の環境でホストされる配置には $merge を使用できます。

  • MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです

  • MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン

  • MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン

$merge の構文は次のとおりです。

{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

以下に例を挙げます。

{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

同じデータベース内のコレクションへの書き込みを含め、 $mergeのすべてのデフォルト オプションを使用する場合は、簡略化された形式を使用できます。

{ $merge: <collection> } // Output collection is in the same database

$mergeは、次のフィールドを持つドキュメントを取得します。

フィールド
説明

出力コレクション。次のいずれかを指定します。

  • 集計が実行されるのと同じデータベース内のコレクションに出力する文字列としてのコレクション名。以下に例を示します。

    into: "myOutput"

  • 指定されたデータベース内のコレクションに出力するドキュメント内のデータベース名とコレクション名。以下に例を示します。

    into: { db:"myDB", coll:"myOutput" }

出力コレクションが存在しない場合は、 $mergeコレクションが作成されます。

出力コレクションはシャーディングされたコレクションにすることができます。

任意。ドキュメントのユニークな識別子として機能するフィールド。識別子は、結果ドキュメントが出力コレクション内の既存のドキュメントと一致するかどうかを判断します。次のいずれかを指定します。

  • 文字列としての 1 つのフィールド名。以下に例を示します。

    on: "_id"

  • 配列内のフィールドの組み合わせ。以下に例を示します。

    on: [ "date", "customerId" ]
    The order of the fields in the array does not matter, and you cannot specify the same field multiple times.

指定されたフィールドの場合、次のようになります。

  • 集計結果ドキュメントには、on フィールドが _id フィールドである場合を除き、on で指定されたフィールドが含まれている必要があります。結果ドキュメントに _id フィールドがない場合、MongoDB はそれを自動的に追加します。

  • 指定されたフィールドに null 値または配列値を含めることはできません。

$mergeには、オン識別子フィールド対応するキーを持つユニークインデックスが必要です。インデックスキーを指定する順序は重要ではありませんが、ユニークインデックスにはキーとして on フィールドのみを含める必要があります。

  • また、インデックスが保持する照合は、集計が保持する照合と同じであることが必要です。

  • 一意なインデックスはスパース インデックスにできます。

  • 一意なインデックスを部分インデックスにすることはできません。

  • 既に存在する出力コレクションの場合、対応するインデックスが既に存在している必要があります。

オンのデフォルト値は出力コレクションによって異なります。

  • 出力コレクションが存在しない場合、オン識別子は である必要があり、デフォルトは_idフィールドになります。 対応する一意の_idインデックスが自動的に作成されます。

  • 既存の出力コレクションがシャーディングされていない場合、on 識別子はデフォルトで _id フィールドになります。

  • 既存の出力コレクションがシャーディングされたコレクションの場合、オン識別子はデフォルトですべてのシャードキー フィールドと_idフィールドになります。 別のon識別子を指定する場合、 onにはすべてのシャードキー フィールドが含まれている必要があります。

任意。 $merge結果ドキュメントとコレクション内の既存のドキュメントで指定され たオン フィールドの値が同じ場合の の動作。

次のいずれかを指定できます。

  • 以下の事前定義されたアクション文字列の中から 1 つ。

    アクション
    説明

    出力コレクション内の既存のドキュメントを一致する結果ドキュメントに置き換えます。

    置換を実行する場合、置換ドキュメントによって _id 値やシャードキー値(出力コレクションがシャーディングされている場合)は変更されません。それ以外の場合、操作はエラーを生成します。

    このエラーを回避するには、オンフィールドに_idフィールドが含まれていない場合は、集計結果の_idフィールドを削除してエラーを回避します(たとえば、先行する$unsetステージなど)。

    既存のドキュメントを出力コレクションに保持します。

    "merge" (デフォルト)

    一致するドキュメントをマージします($mergeObjects 演算子と同様)。

    • 結果ドキュメントに既存のドキュメントにないフィールドが含まれている場合は、それらの新しいフィールドを既存のドキュメントに追加します。

    • 結果ドキュメントに既存のドキュメントのフィールドが含まれている場合は、既存のフィールド値を結果ドキュメントのフィールド値に置き換えます。

    たとえば、出力コレクションに以下のドキュメントがあるとします。

    { _id: 1, a: 1, b: 1 }

    また、集計結果には以下のドキュメントがあるとします。

    { _id: 1, b: 5, z: 1 }

    その場合、マージされたドキュメントは以下のようになります。

    { _id: 1, a: 1, b: 5, z: 1 }

    マージを実行する場合、マージされたドキュメントによって _id 値やシャードキー値(出力コレクションがシャーディングされている場合)は変更されません。それ以外の場合、操作はエラーを生成します。

    このエラーを回避するには、オンフィールドに_idフィールドが含まれていない場合は、集計結果の_idフィールドを削除してエラーを回避します(たとえば、先行する$unsetステージなど)。

    集計操作を停止して失敗させます。前ドキュメントからの出力コレクションへの変更は、元には戻されません。

  • コレクション内のドキュメントを更新するための集計パイプライン。

    [ <stage1>, <stage2> ... ]

    パイプラインは、次のステージのみで構成できます。

    パイプラインはオンフィールドの値を変更できません。 たとえば、フィールドmonthで一致している場合、パイプラインはmonthフィールドを変更できません。

    whenMatched pipeline は、$<field> を使用して出力コレクション内の既存のドキュメントのフィールドに直接アクセスできます。

    集計結果ドキュメントのフィールドにアクセスするには、次のいずれかを使用します。

    • フィールドにアクセスするための組み込み変数$$new。具体的には、 $$new.<field>$$new変数は、 let指定が省略されている場合にのみ使用できます。

    • letフィールド内のユーザー定義の変数。

      二重ドル記号($$)プレフィックスと変数名を $$<variable_name> 形式で指定します。たとえば、$$year のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field> という形式でフィールドを含めることもできます。たとえば、$$year.month のような形式です。

      その他の例については、 「変数を使用してマージをカスタマイズする」を参照してください。

任意。 whenMatched パイプライン で使用する変数を指定します。

以下のように、変数名と値の式を使ってドキュメントを指定します。

{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }

指定されていない場合は、デフォルトで{ new: "$$ROOT" }になります ( ROOTを参照)。whenMatched パイプラインは$$new変数にアクセスできます。

whenMatched パイプライン内の変数にアクセスするには、次のようにします。

二重ドル記号($$)プレフィックスと変数名を $$<variable_name> 形式で指定します。たとえば、$$year のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field> という形式でフィールドを含めることもできます。たとえば、$$year.month のような形式です。

例については、「変数を使用したマージのカスタマイズ 」を参照してください。

任意。 結果ドキュメントがアウトコレクション内の既存のドキュメントと一致しない場合の$mergeの動作。

以下の事前定義されたアクション文字列のいずれかを指定できます。

アクション
説明
"insert" (Default)

ドキュメントを出力コレクションに挿入します。

ドキュメントを破棄します。 具体的には、 $mergeはドキュメントを出力コレクションに挿入しません。

集計操作を停止して失敗させます。すでに出力コレクションに書き込まれた変更は、元には戻されません。

集計パイプライン結果のドキュメントに_idフィールドが存在しない場合は、 $mergeステージによって自動的にフィールドが生成されます。

たとえば、次の集計パイプラインでは、 $project$mergeに渡されるドキュメントから_idフィールドを除外します。$mergeがこれらのドキュメントを"newCollection"に書き込むと、 $mergeは新しい_idフィールドと値を生成します。

db.sales.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )

指定された出力コレクションが存在しない場合は、 $merge操作によって新しいコレクションが作成されます。

  • 出力コレクションは、 $mergeが最初のドキュメントをコレクションに書込んだときに作成され、すぐに表示されます。

  • 集計が失敗した場合、エラーの前に $merge によって完了した書き込みはロールバックされません。

注意

レプリカセットまたはスタンドアロンでは、出力データベースが存在しない場合、 $mergeによってデータベースも作成されます。

シャーディングされたクラスターの場合、指定された出力データベースがすでに存在している必要があります。

出力コレクションが存在しない場合、 $mergeではオン識別子が_idフィールドである必要があります。 存在しないコレクションに対して別のonフィールド値を使用するには、まずフィールドに一意なインデックスを作成してコレクションを作成します。 たとえば、出力コレクションnewDailySales201905が存在せず、オン識別子としてsalesDateフィールドを指定する場合は次のようになります。

db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
db.sales.aggregate( [
{ $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
{ $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
{ $merge : { into : "newDailySales201905", on: "salesDate" } }
] )

$mergeステージはシャーディングされたコレクションに出力できます。 出力コレクションがシャーディングされると、 $merge_idフィールドとすべてのシャードキーフィールドをデフォルトのオン識別子として使用します。 デフォルトを上書きする場合、オン識別子にはすべてのシャードキーフィールドを含める必要があります。

{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

たとえば、 sharding enabledがあるデータベースで、 sh.shardCollection()メソッドを使用して、 postcodeフィールドをシャードキーとして持つ新しいシャーディングされたコレクションnewrestaurantsを作成します。

sh.enableSharding("exampledb"); // Sharding must be enabled in the database
sh.shardCollection(
"exampledb.newrestaurants", // Namespace of the collection to shard
{ postcode: 1 }, // Shard key
);

newrestaurantsコレクションには、月別 ( dateフィールド)および郵便番号別 (シャードキー)の新しいレストランのオープンに関する情報を含むドキュメントが含まれます。具体的には、オン識別子は["date", "postcode"]です (フィールドの順序は重要ではありません)。$mergeには、オン識別子フィールドに対応するキーを持つユニークインデックスが必要なので、ユニークインデックスを作成します(フィールドの順序は関係ありません)。 [ 1 ]

use exampledb
db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )

シャーディングされたコレクションrestaurantsと一意のインデックスを作成すると、 $mergeを使用して集計結果をこのコレクションに出力でき、この例のように[ "date", "postcode" ]に一致します。

use exampledb
db.openings.aggregate([
{ $group: {
_id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
restaurants: { $push: "$restaurantName" } } },
{ $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
{ $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
])
[1] sh.shardCollection() メソッドは、シャードキーが範囲ベースであり、コレクションが空であり、シャードキーにユニークインデックスがまだ存在しない場合に、{ unique: true } オプションを渡すとシャードキーにユニークインデックスを作成することもできます。前述の例では、on 識別子はシャードキーと別のフィールドであるため、対応するインデックスを作成するための別の操作が必要です。

$mergeは、オン仕様に基づいて一致するドキュメントが集計結果に含まれている場合、出力コレクション内の既存のドキュメントを置き換えることができます。 そのため、集計結果にコレクション内のすべての既存のドキュメントと一致するドキュメントが含まれ、$merge whenMatched に対して "replace" が指定されている場合、 は既存のコレクション内のすべてのドキュメントを置き換えることができます。

ただし、集計結果に関係なく既存のコレクションを置き換えるには、代わりに $out を使用します。

$merge$mergeによって既存のドキュメントの_id 値が変更されると、エラーが発生します。

Tip

このエラーを回避するには、オンフィールドに_idフィールドが含まれていない場合は、集計結果の_idフィールドを削除してエラーを回避します(たとえば、先行する$unsetステージなど)。

さらに、シャーディングされたコレクションの場合、既存のドキュメントのシャードキー値が変更されると、 $mergeはエラーも生成します。

エラーが発生する前に$mergeによって完了した書き込みはロールバックされません。

$mergeオンフィールドに使用する一意のインデックスが集計の途中で削除された場合、集計が強制終了される保証はありません。集計が続行される場合、ドキュメントに重複する on フィールド値が存在しないという保証はありません。

$mergeが出力コレクションの一意のインデックスに違反するドキュメントを書込もうとした場合、操作でエラーが生成されます。 例:

コレクションでスキーマ検証が使用されており、 validationActionerrorに設定されている場合、無効なドキュメントを挿入したり、 $mergeを使用して無効な値を持つドキュメントを更新したりすると、 MongoServerErrorがスローされ、ドキュメントはターゲット コレクションに書き込まれません。 無効なドキュメントが複数ある場合、最初に見つかった無効なドキュメントのみがエラーをスローします。 すべての有効なドキュメントはターゲット コレクションに書き込まれ、すべての無効なドキュメントは書込みに失敗します。

$mergeステージに対して次の すべて が当てはまる場合、 $mergeはドキュメントを出力コレクションに直接挿入します。

  • whenMatchedの値は集計パイプラインであり、

  • whenNotMatchedの値はinsertで、かつ

  • 出力コレクションに一致するドキュメントがありません。

$merge は、ドキュメントを出力コレクションに直接挿入します。

$mergeの導入により、MongoDB は集計パイプラインの結果をコレクションに書き込むための 2 つのステージ$merge$outを提供します。

  • 同じデータベースまたは異なるデータベース内のコレクションに出力できます。

  • 同じデータベースまたは異なるデータベース内のコレクションに出力できます。

  • 出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。

  • 出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。

  • 結果(新しいドキュメントの挿入、ドキュメントのマージ、ドキュメントの置換、既存のドキュメントの保持、操作の失敗、カスタムアップデートパイプラインによるドキュメントの処理)を既存のコレクションに組み込むことができます。

  • 出力コレクションが既に存在する場合は、完全に置き換えます。

  • シャーディングされたコレクションに出力できます。入力コレクションもシャーディング可能です。

  • シャーディングされたコレクションに出力できません。ただし、入力コレクションはシャーディングできます。

  • 以下の SQL ステートメントに対応します。

    • MERGE.

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

    • マテリアライズドビューの作成と更新。

  • 以下の SQL ステートメントに対応します。

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

警告

$mergeの出力が集約されている同じコレクションに出力される場合、ドキュメントが複数回更新されたり、操作によって無限ループが発生したりする可能性があります。 この動作は、 $mergeによって実行されるアップデートによって、ディスクに保存されているドキュメントの物理的な場所が変更された場合に発生します。 ドキュメントの物理的な場所が変更されると、 $mergeはそのドキュメントを完全に新しいドキュメントとして表示し、追加のアップデートが行われることがあります。 この動作の詳細については、「 日付 の問題 」を参照してください。

$merge は、集約されている同じコレクションに出力できます。$lookupなど、パイプラインの他のステージに表示されるコレクションに出力することもできます。

制限事項
説明
集計パイプラインでは、 トランザクション$merge 内で を使用できません。
集計パイプラインでは、 $mergeを使用して時系列コレクションに出力することはできません。
Separate from materialized view
ビュー定義$mergeステージを含めることはできません。 ビュー定義にネストされたパイプラインが含まれている場合(たとえば、ビュー定義に$facetステージが含まれている場合)、この$mergeステージの制限はネストされたパイプラインにも適用されます。
$lookup ステージ
$lookupステージのネストされたパイプラインには$mergeステージを含めることはできません。
$facet ステージ
$facetステージのネストされたパイプラインには$mergeステージを含めることはできません。
$unionWith ステージ
$unionWithステージのネストされたパイプラインには$mergeステージを含めることはできません。
"linearizable" 読み取り保証 (read concern)

$mergeステージは読み取り保証 (read concern)"linearizable"と組み合わせて使用することはできません。つまり、 "linearizable"読み取り保証(read concern) をdb.collection.aggregate()に対して指定した場合、$mergeステージをパイプラインに含めることはできません。

出力コレクションが存在しない場合、 $mergeはコレクションを作成します。

たとえば、zoo データベース内の salaries という名前のコレクションには、従業員の給与と所属部門の履歴を格納します。

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
])

$group } ステージと$mergeステージを使用して、現在salariesコレクションに含まれているデータからbudgetsreportingデータベース内)という名前のコレクションをまず作成できます。

注意

レプリカセット またはスタンドアロン配置では、出力データベースが存在しない場合、 $mergeによってデータベースも作成されます。

シャーディングされたクラスター配置の場合、指定された出力データベースがすでに存在している必要があります。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
  • $group ステージを使って、給与を fiscal_yeardept でグループ化します。

  • $mergeステージは、先行する$groupステージの出力をreportingデータベース内のbudgetsコレクションに書き込みます。

新しい budgets コレクション内のドキュメントを表示するには次のようにします。

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

budgets コレクションには次のドキュメントが含まれます。

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

Tip

以下も参照してください。

次の例では、前述の例のコレクションを使用します。

例の salaries コレクションには、以下のように従業員の給与と所属部門の履歴が含まれています。

{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

例の budgets コレクションには、以下のように累積年間予算が含まれています。

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

現在の会計年度(この例では、2019)では、以下のように新しい従業員が salaries コレクションに追加され、新しい人数が翌年に事前に割り当てられます。

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
{ "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
{ "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
{ "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
])

新しい給与情報を反映するように budgets コレクションを更新するには、次の集計パイプラインは以下を使用します。

  • $match ステージを使って、fiscal_year2019 以降であるすべてのドキュメントを検索します。

  • $group ステージを使って、給与を fiscal_yeardept でグループ化します。

  • $mergeを使用して結果セットをbudgetsコレクションに書込み、同じ_id値を持つドキュメント(この例では、会計年度と部門のあるドキュメント)を置き換えます。 コレクション内に一致しないドキュメントの場合、 $mergeは新しいドキュメントを挿入します。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match : { fiscal_year: { $gte : 2019 } } },
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

集計が実行された後、次のように budgets コレクション内のドキュメントを表示します。

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

budgets コレクションには、2019 会計年度の新しい給与データが組み込まれ、2020 会計年度の新しいドキュメントが追加されています。

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }

Tip

以下も参照してください。

$mergeがコレクション内の既存のデータを上書きしないようにするには、 whenMatchedkeepExistingまたは失敗に設定します。

zoo データベース内の salaries コレクションの例には、従業員の給与と部門の履歴が含まれています。

{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

reporting データベースのコレクション orgArchive には、過去の会計年度における部門の履歴としての組織レコードが含まれています。アーカイブされたレコードは変更しないでください。

{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }

orgArchive コレクションには、fiscal_year フィールドと dept フィールドの一意の複合インデックスがあります。具体的には、会計年度と部門の 1 つの組み合わせに対して最大 1 つのレコードが存在する必要があります。

db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )

現在の会計年度末(この例では2019)の時点で、salaries コレクションには次のドキュメントが含まれています。

{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
{ "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
{ "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
{ "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
{ "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
{ "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
{ "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
{ "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
{ "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
{ "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
{ "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
{ "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }

終了したばかりの会計年度 2019 を含めるように orgArchive コレクションを更新するには、次の集計パイプラインを使用します。

  • $matchステージを使用して、fiscal_year2019 に等しいすべてのドキュメントを検索します。

  • $group ステージを使って、従業員を fiscal_yeardept でグループ化します。

  • $projectステージでは、 _idフィールドを抑制し、別のdeptフィールドとfiscal_yearフィールドを追加します。ドキュメントが$mergeに渡されると、 $mergeドキュメントに対して新しい_idフィールドを自動的に生成します。

  • $mergeは結果セットをorgArchiveに書込みます。

    $mergeステージは、一致dept fiscal_yearするfails 場合、 フィールドと フィールド、および のドキュメントと一致します。つまり、同じ部門と会計年度のドキュメントがすでに存在する場合、 $mergeはエラーになります。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )

操作後、orgArchive コレクションには次のドキュメントが含まれます。

{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }

orgArchiveコレクションに、部門"A"および/または"B"の2019のドキュメントがすでに含まれている場合、重複キー エラーのため集計は失敗します。ただし、エラーが発生する前に挿入されたドキュメントはロールバックされません。

一致するドキュメントにkeepExistingを指定すると、集計は一致するドキュメントに影響を与えず、重複キー エラーは発生しません。同様に、 replaceを指定した場合、操作は失敗しませんが、既存のドキュメントが置き換えられます。

デフォルトでは、集計結果内のドキュメントがコレクション内のドキュメントと一致する場合、 $mergeステージはドキュメントをマージします。

サンプルコレクション purchaseorders には、四半期およびリージョン別の発注情報を格納します。

db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )

別のサンプルコレクション reportedsales には、四半期およびリージョンごとに報告された売上情報を格納します。

db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )

レポート作成の目的で、次の形式で四半期ごとにデータを表示するとします。

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

$mergeを使用してpurchaseordersコレクションの結果とreportedsalesコレクションの結果をマージし、新しいコレクションquarterlyreportを作成できます。

quarterlyreport コレクションを作成するには、次のパイプラインを使用できます。

db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
第 1 ステージ:

$group ステージでは四半期ごとにグループ化し、$sum を使用して qty フィールドを新しい purchased フィールドに追加します。以下に例を示します。

quarterlyreport コレクションを作成するには、このパイプラインを次のように使用できます。

{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
第 2 ステージ:
$mergeステージでは、ドキュメントを同じデータベース内のquarterlyreportコレクションに書き込みます。 ステージがコレクション内で_idフィールドに一致する 既存のドキュメントを見つけた場合、ステージは一致するドキュメントをマージします。 それ以外の場合、 ステージはドキュメントを挿入します。 最初の作成では、一致するドキュメントはありません。

コレクション内の文書を表示するには、以下の操作を実行します。

db.quarterlyreport.find().sort( { _id: 1 } )

コレクションには、次のドキュメントが含まれます。

{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }

同様に、reportedsales コレクションに対して次の集計パイプラインを実行して、売上結果を quarterlyreport コレクションにマージします。

db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
第 1 ステージ:

$group ステージでは四半期ごとにグループ化し、$sum を使用して qty フィールドを新しい sales フィールドに追加します。以下に例を示します。

{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
第 2 ステージ:
$mergeステージでは、ドキュメントを同じデータベース内のquarterlyreportコレクションに書き込みます。ステージがコレクション内で_idフィールド(四半期)に一致する既存のドキュメントを見つけた場合、ステージは一致するドキュメントをマージします。それ以外の場合、ステージはドキュメントを挿入します。

データがマージされた後に quarterlyreport コレクション内のドキュメントを表示するには、次の操作を実行します。

db.quarterlyreport.find().sort( { _id: 1 } )

コレクションには、次のドキュメントが含まれます。

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

$mergeは、ドキュメントが一致する場合にカスタム更新パイプラインを使用できます。whenMatched パイプラインには次のステージがあります。

サンプルコレクション votes には、毎日の投票集計を格納します。次のドキュメントを使用してコレクションを作成します。

db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )

別の例のコレクションmonthlytotals には、最新の月間投票総数が含まれています。次のドキュメントを使用してコレクションを作成します。

db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)

毎日の終わりに、その日の投票が votes コレクションに挿入されます。

db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)

カスタム パイプラインで$mergeを使用して、コレクションmonthlytotals内の既存のドキュメントを更新できます。

db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
第 1 ステージ:

$match ステージでは、特定の日の投票を検索します。以下に例を示します。

{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
第 2 ステージ:

$project ステージでは、_id フィールドを年月の文字列に設定します。以下に例を示します。

{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
第 3 ステージ:

$mergeステージでは、ドキュメントを同じデータベース内のmonthlytotalsコレクションに書き込みます。ステージがコレクション内で_idフィールドに一致する既存のドキュメントを見つけた場合、ステージはパイプラインを使用してthumbsup票とthumbsdown票を追加します。

  • このパイプラインは、結果ドキュメントのフィールドに直接アクセスすることはできません。結果ドキュメントの thumbsup フィールドと thumbsdown フィールドにアクセスするために、パイプラインは $$new 変数(つまり、$$new.thumbsup$new.thumbsdown)を使用します。

  • このパイプラインは、コレクション内の既存のドキュメントの thumbsup フィールドと thumbsdown フィールド、つまり $thumbsup$thumbsdown に直接アクセスできます。

結果のドキュメントによって、既存のドキュメントが置き換えられます。

マージ操作後に monthlytotals コレクション内のドキュメントを表示するには、次の操作を実行します。

db.monthlytotals.find()

コレクションには、次のドキュメントが含まれます。

{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }

$mergeステージのwhenMatchedフィールドで変数を使用できます。変数は、使用する前に定義する必要があります。

次のいずれかまたは両方で変数を定義します。

whenMatched で変数を使用するには次のようにします。

二重ドル記号($$)プレフィックスと変数名を $$<variable_name> 形式で指定します。たとえば、$$year のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field> という形式でフィールドを含めることもできます。たとえば、$$year.month のような形式です。

以下のタブは、変数を mergeステージ、aggregate コマンド、またはその両方で定義したときの動作を示します。

$mergeステージレットで変数を定義し、 whenMatchedフィールドの変数を使用できます。

例:

db.cakeSales.insertOne( [
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
] )
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {}
} )
db.cakeSales.find()

例:

  • 次の名称のコレクションを作成します: cakeSales

  • $mergeレットyear 変数を定義するaggregateコマンドを実行し、whenMatchedを使用して年数をcakeSales に追加します

  • retrieves the cakeSales document

出力:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }

バージョン 5.0 で追加

aggregateコマンドレットで変数を定義し、 $mergeステージのwhenMatchedフィールドで変数を使用できます。

例:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
whenMatched: [ {
$addFields: { "salesYear": "$$year" } }
] }
}
],
cursor: {},
let : { year: "2020" }
} )
db.cakeSales.find()

例:

  • 次の名称のコレクションを作成します: cakeSales

  • aggregateコマンドletyear変数を定義するaggregateコマンドを実行しwhenMatchedを使用してcakeSalesに年数を追加します

  • retrieves the cakeSales document

出力:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }

変数は、 $mergeステージで定義できます。また、MongoDB 5.0以降では、 aggregateコマンドでも定義できます。

$mergeステージとaggregateコマンドで同じ名前の 2 つの変数が定義されている場合は、 $mergeステージ変数が使用されます。

この例では、year: "2020"$merge ステージ変数がyear: "2019"aggregateコマンド変数の代わりに使用されています。

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2019" }
} )
db.cakeSales.find()

出力:

{
_id: 1,
flavor: 'chocolate',
salesTotal: 1580,
salesTrend: 'up',
salesYear: '2020'
}

戻る

$match