$merge(集計)
定義
注意
このページでは、集計パイプラインの結果をコレクションに出力する $merge
ステージについて説明します。複数のドキュメントを 1 つのドキュメントにマージする $mergeObjects
演算子については、$mergeObjects
を参照してください。
$merge
集計パイプラインの結果を指定されたコレクションに書き込みます。
$merge
演算子はパイプラインの最後のステージ である必要があります。$merge
ステージ:同じデータベースまたは異なるデータベース内のコレクションに出力できます。
集約されている同じコレクションに出力できます。 詳細については、 「 集計されているのと同じコレクションへの出力 」を参照してください。
集計パイプラインで
$merge
または$out
ステージを使用する場合は、次の点を考慮してください。MongoDB 5.0 以降では、クラスター内のすべてのノードの featureCompatibilityVersion が
5.0
以上に設定されており、読み込み設定(read preference) でセカンダリ読み取りが許可されている場合、$merge
ステージのパイプラインをレプリカセットのセカンダリノードで実行できます。MongoDB の以前のバージョンでは、
$out
または$merge
ステージを持つパイプラインは常にプライマリ ノードで実行され、読み込み設定 (read preference) は考慮されませんでした。
出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。
結果(新しいドキュメントの挿入、ドキュメントのマージ、ドキュメントの置換、既存のドキュメントの保持、操作の失敗、カスタムアップデートパイプラインによるドキュメントの処理)を既存のコレクションに組み込むことができます。
シャーディングされたコレクションに出力できます。入力コレクションもシャーディング可能です。
集計結果もコレクションに出力する
$out
ステージとの比較については、「$merge
と$out
の比較 」を参照してください。
注意
オンデマンドのマテリアライズドビュー
$merge
は、コレクションの完全な置換を行うのではなく、パイプラインの結果を既存の出力コレクションに組み込むことができます。この機能により、ユーザーはオンデマンドのマテリアライズドビューを作成できます。ここでは、パイプラインの実行時に出力コレクションの内容が段階的に更新されます。
このユースケースの詳細については、このページの例だけでなく、オンデマンド マテリアライズドビューも参照してください。
マテリアライズドビューは読み取り専用のビューとは別物です。読み取り専用ビューの作成については、「読み取り専用ビュー」を参照してください。
互換性
次の環境でホストされる配置には $merge
を使用できます。
MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです
MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン
MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン
構文
$merge
の構文は次のとおりです。
{ $merge: { into: <collection> -or- { db: <db>, coll: <collection> }, on: <identifier field> -or- [ <identifier field1>, ...], // Optional let: <variables>, // Optional whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional whenNotMatched: <insert|discard|fail> // Optional } }
以下に例を挙げます。
{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
同じデータベース内のコレクションへの書き込みを含め、 $merge
のすべてのデフォルト オプションを使用する場合は、簡略化された形式を使用できます。
{ $merge: <collection> } // Output collection is in the same database
$merge
は、次のフィールドを持つドキュメントを取得します。
フィールド | 説明 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
出力コレクション。次のいずれかを指定します。
出力コレクションが存在しない場合は、
出力コレクションはシャーディングされたコレクションにすることができます。 | |||||||||||
任意。ドキュメントのユニークな識別子として機能するフィールド。識別子は、結果ドキュメントが出力コレクション内の既存のドキュメントと一致するかどうかを判断します。次のいずれかを指定します。
指定されたフィールドの場合、次のようになります。
オンのデフォルト値は出力コレクションによって異なります。
| |||||||||||
任意。 次のいずれかを指定できます。
| |||||||||||
任意。 whenMatched パイプライン で使用する変数を指定します。 以下のように、変数名と値の式を使ってドキュメントを指定します。
指定されていない場合は、デフォルトで whenMatched パイプライン内の変数にアクセスするには、次のようにします。 二重ドル記号($$)プレフィックスと変数名を 例については、「変数を使用したマージのカスタマイズ 」を参照してください。 | |||||||||||
Considerations
_id
フィールド生成
集計パイプライン結果のドキュメントに_id
フィールドが存在しない場合は、 $merge
ステージによって自動的にフィールドが生成されます。
たとえば、次の集計パイプラインでは、 $project
は$merge
に渡されるドキュメントから_id
フィールドを除外します。$merge
がこれらのドキュメントを"newCollection"
に書き込むと、 $merge
は新しい_id
フィールドと値を生成します。
db.sales.aggregate( [ { $project: { _id: 0 } }, { $merge : { into : "newCollection" } } ] )
出力コレクションが存在しない場合の新しいコレクションの作成
指定された出力コレクションが存在しない場合は、 $merge
操作によって新しいコレクションが作成されます。
出力コレクションは、
$merge
が最初のドキュメントをコレクションに書込んだときに作成され、すぐに表示されます。集計が失敗した場合、エラーの前に
$merge
によって完了した書き込みはロールバックされません。
注意
レプリカセットまたはスタンドアロンでは、出力データベースが存在しない場合、 $merge
によってデータベースも作成されます。
シャーディングされたクラスターの場合、指定された出力データベースがすでに存在している必要があります。
出力コレクションが存在しない場合、 $merge
ではオン識別子が_id
フィールドである必要があります。 存在しないコレクションに対して別のon
フィールド値を使用するには、まずフィールドに一意なインデックスを作成してコレクションを作成します。 たとえば、出力コレクションnewDailySales201905
が存在せず、オン識別子としてsalesDate
フィールドを指定する場合は次のようになります。
db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } ) db.sales.aggregate( [ { $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } }, { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } }, { $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } }, { $merge : { into : "newDailySales201905", on: "salesDate" } } ] )
シャーディングされたコレクションへの出力
$merge
ステージはシャーディングされたコレクションに出力できます。 出力コレクションがシャーディングされると、 $merge
は_id
フィールドとすべてのシャードキーフィールドをデフォルトのオン識別子として使用します。 デフォルトを上書きする場合、オン識別子にはすべてのシャードキーフィールドを含める必要があります。
{ $merge: { into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" }, on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields let: <variables>, // Optional whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional whenNotMatched: <insert|discard|fail> // Optional } }
たとえば、sh.shardCollection()
メソッドを使用して、postcode
フィールドをシャードキーとして持つ新しいシャーディングされたコレクションnewrestaurants
を作成します。
sh.shardCollection( "exampledb.newrestaurants", // Namespace of the collection to shard { postcode: 1 }, // Shard key );
newrestaurants
コレクションには、月別 ( date
フィールド)および郵便番号別 (シャードキー)の新しいレストランのオープンに関する情報を含むドキュメントが含まれます。具体的には、オン識別子は["date", "postcode"]
です (フィールドの順序は重要ではありません)。$merge
には、オン識別子フィールドに対応するキーを持つユニークインデックスが必要なので、ユニークインデックスを作成します(フィールドの順序は関係ありません)。 [ 1 ]
use exampledb db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )
シャーディングされたコレクションrestaurants
と一意のインデックスを作成すると、 $merge
を使用して集計結果をこのコレクションに出力でき、この例のように[ "date", "postcode" ]
に一致します。
use exampledb db.openings.aggregate([ { $group: { _id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" }, restaurants: { $push: "$restaurantName" } } }, { $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } }, { $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } } ])
[1] | sh.shardCollection() メソッドは、シャードキーが範囲ベースであり、コレクションが空であり、シャードキーにユニークインデックスがまだ存在しない場合に、{ unique: true
} オプションを渡すとシャードキーにユニークインデックスを作成することもできます。前述の例では、on 識別子はシャードキーと別のフィールドであるため、対応するインデックスを作成するための別の操作が必要です。 |
ドキュメントの置換($merge
)とコレクションの置換($out
)<a class=\" \" href=\" \" title=\" \"><svg xmlns=\" \" width=\" \" height=\" \" fill=\" \" viewbox=\" \" class=\" \" role=\" \" aria-label=\" \"><path fill=\" \" d=\" \"> <path fill=\" \" d=\" \">
$merge
は、オン仕様に基づいて一致するドキュメントが集計結果に含まれている場合、出力コレクション内の既存のドキュメントを置き換えることができます。 そのため、集計結果にコレクション内のすべての既存のドキュメントと一致するドキュメントが含まれ、$merge
whenMatched に対して "replace" が指定されている場合、 は既存のコレクション内のすべてのドキュメントを置き換えることができます。
ただし、集計結果に関係なく既存のコレクションを置き換えるには、代わりに $out
を使用します。
既存のドキュメントと およびシャードキー値<a class=\" \" href=\" \" title=\" \"><svg xmlns=\" \" width=\" \" height=\" \" fill=\" \" viewbox=\" \" class=\" \"_id
role=\" \" aria-label=\" \"><path fill=\" \" d=\" \"> <path fill=\" \" d=\" \">
ユニークインデックス制約
$merge
がオンフィールドに使用する一意のインデックスが集計の途中で削除された場合、集計が強制終了される保証はありません。集計が続行される場合、ドキュメントに重複する on
フィールド値が存在しないという保証はありません。
$merge
が出力コレクションの一意のインデックスに違反するドキュメントを書込もうとした場合、操作でエラーが生成されます。 例:
オン フィールドのインデックス以外の一意のインデックスに違反する、一致しないドキュメントを挿入します。
コレクション内に一致するドキュメントがある場合は失敗します。具体的には、この操作は、オンフィールドの一意のインデックスに違反する一致するドキュメントを挿入しようとします。
オン フィールドのインデックス以外の一意のインデックスに違反するドキュメントとなる一致するドキュメントをマージします。
スキーマ検証
コレクションでスキーマ検証が使用されており、 validationAction
がerror
に設定されている場合、無効なドキュメントを挿入したり、 $merge
を使用して無効な値を持つドキュメントを更新したりすると、 MongoServerError
がスローされ、ドキュメントはターゲット コレクションに書き込まれません。 無効なドキュメントが複数ある場合、最初に見つかった無効なドキュメントのみがエラーをスローします。 すべての有効なドキュメントはターゲット コレクションに書き込まれ、すべての無効なドキュメントは書込みに失敗します。
whenMatched
パイプラインの動作
$merge
ステージに対して次の すべて が当てはまる場合、 $merge
はドキュメントを出力コレクションに直接挿入します。
whenMatchedの値は集計パイプラインであり、
whenNotMatchedの値は
insert
で、かつ出力コレクションに一致するドキュメントがありません。
$merge
は、ドキュメントを出力コレクションに直接挿入します。
$merge
と$out
の比較
$merge
の導入により、MongoDB は集計パイプラインの結果をコレクションに書き込むための 2 つのステージ$merge
と$out
を提供します。
|
|
|
|
|
|
|
|
|
|
集計されているのと同じコレクションへの出力
警告
$merge
の出力が集約されている同じコレクションに出力される場合、ドキュメントが複数回更新されたり、操作によって無限ループが発生したりする可能性があります。 この動作は、 $merge
によって実行されるアップデートによって、ディスクに保存されているドキュメントの物理的な場所が変更された場合に発生します。 ドキュメントの物理的な場所が変更されると、 $merge
はそのドキュメントを完全に新しいドキュメントとして表示し、追加のアップデートが行われることがあります。 この動作の詳細については、「 日付 の問題 」を参照してください。
$merge
は、集約されている同じコレクションに出力できます。$lookup
など、パイプラインの他のステージに表示されるコレクションに出力することもできます。
制限事項
制限事項 | 説明 |
---|---|
集計パイプラインでは、 トランザクション $merge 内で を使用できません。 | |
集計パイプラインでは、 $merge を使用して時系列コレクションに出力することはできません。 | |
Separate from materialized view | |
$lookup ステージ | |
$facet ステージ | |
$unionWith ステージ | |
"linearizable" 読み取り保証 (read concern) |
|
例
オンデマンドのマテリアライズドビュー: 初期作成
出力コレクションが存在しない場合、 $merge
はコレクションを作成します。
たとえば、zoo
データベース内の salaries
という名前のコレクションには、従業員の給与と所属部門の履歴を格納します。
db.getSiblingDB("zoo").salaries.insertMany([ { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 } ])
$group
} ステージと$merge
ステージを使用して、現在salaries
コレクションに含まれているデータからbudgets
( reporting
データベース内)という名前のコレクションをまず作成できます。
注意
レプリカセット またはスタンドアロン配置では、出力データベースが存在しない場合、 $merge
によってデータベースも作成されます。
シャーディングされたクラスター配置の場合、指定された出力データベースがすでに存在している必要があります。
db.getSiblingDB("zoo").salaries.aggregate( [ { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } }, { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
$group
ステージを使って、給与をfiscal_year
とdept
でグループ化します。$merge
ステージは、先行する$group
ステージの出力をreporting
データベース内のbudgets
コレクションに書き込みます。
新しい budgets
コレクション内のドキュメントを表示するには次のようにします。
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
budgets
コレクションには次のドキュメントが含まれます。
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 } { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 } { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 } { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 } { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 } { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
オンデマンドのマテリアライズドビュー: データの更新/置換
次の例では、前述の例のコレクションを使用します。
例の salaries
コレクションには、以下のように従業員の給与と所属部門の履歴が含まれています。
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
例の budgets
コレクションには、以下のように累積年間予算が含まれています。
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 } { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 } { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 } { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 } { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 } { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
現在の会計年度(この例では、2019
)では、以下のように新しい従業員が salaries
コレクションに追加され、新しい人数が翌年に事前に割り当てられます。
db.getSiblingDB("zoo").salaries.insertMany([ { "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 }, { "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 }, { "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 }, { "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 } ])
新しい給与情報を反映するように budgets
コレクションを更新するには、次の集計パイプラインは以下を使用します。
$match
ステージを使って、fiscal_year
が2019
以降であるすべてのドキュメントを検索します。$group
ステージを使って、給与をfiscal_year
とdept
でグループ化します。$merge
を使用して結果セットをbudgets
コレクションに書込み、同じ_id
値を持つドキュメント(この例では、会計年度と部門のあるドキュメント)を置き換えます。 コレクション内に一致しないドキュメントの場合、$merge
は新しいドキュメントを挿入します。
db.getSiblingDB("zoo").salaries.aggregate( [ { $match : { fiscal_year: { $gte : 2019 } } }, { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } }, { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
集計が実行された後、次のように budgets
コレクション内のドキュメントを表示します。
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
budgets
コレクションには、2019 会計年度の新しい給与データが組み込まれ、2020 会計年度の新しいドキュメントが追加されています。
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 } { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 } { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 } { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 } { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 } { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 } { "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }
新しいデータの挿入に限定
$merge
がコレクション内の既存のデータを上書きしないようにするには、 whenMatchedをkeepExistingまたは失敗に設定します。
zoo
データベース内の salaries
コレクションの例には、従業員の給与と部門の履歴が含まれています。
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
reporting
データベースのコレクション orgArchive
には、過去の会計年度における部門の履歴としての組織レコードが含まれています。アーカイブされたレコードは変更しないでください。
{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 } { "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 } { "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 } { "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
orgArchive
コレクションには、fiscal_year
フィールドと dept
フィールドの一意の複合インデックスがあります。具体的には、会計年度と部門の 1 つの組み合わせに対して最大 1 つのレコードが存在する必要があります。
db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )
現在の会計年度末(この例では2019
)の時点で、salaries
コレクションには次のドキュメントが含まれています。
{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 } { "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 } { "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 } { "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 } { "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 } { "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 } { "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 } { "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 } { "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 } { "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 } { "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 } { "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 } { "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 } { "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
終了したばかりの会計年度 2019
を含めるように orgArchive
コレクションを更新するには、次の集計パイプラインを使用します。
$match
ステージを使用して、fiscal_year
が2019
に等しいすべてのドキュメントを検索します。$group
ステージを使って、従業員をfiscal_year
とdept
でグループ化します。$project
ステージでは、_id
フィールドを抑制し、別のdept
フィールドとfiscal_year
フィールドを追加します。ドキュメントが$merge
に渡されると、$merge
ドキュメントに対して新しい_id
フィールドを自動的に生成します。$merge
は結果セットをorgArchive
に書込みます。$merge
ステージは、一致dept
fiscal_year
するfails
場合、 フィールドと フィールド、および のドキュメントと一致します。つまり、同じ部門と会計年度のドキュメントがすでに存在する場合、$merge
はエラーになります。
db.getSiblingDB("zoo").salaries.aggregate( [ { $match: { fiscal_year: 2019 }}, { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } }, { $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } }, { $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } } ] )
操作後、orgArchive
コレクションには次のドキュメントが含まれます。
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 } { "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 } { "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 } { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 } { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 } { "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }
orgArchive
コレクションに、部門"A"
および/または"B"
の2019のドキュメントがすでに含まれている場合、重複キー エラーのため集計は失敗します。ただし、エラーが発生する前に挿入されたドキュメントはロールバックされません。
一致するドキュメントにkeepExistingを指定すると、集計は一致するドキュメントに影響を与えず、重複キー エラーは発生しません。同様に、 replaceを指定した場合、操作は失敗しませんが、既存のドキュメントが置き換えられます。
複数のコレクションからの結果のマージ
デフォルトでは、集計結果内のドキュメントがコレクション内のドキュメントと一致する場合、 $merge
ステージはドキュメントをマージします。
サンプルコレクション purchaseorders
には、四半期およびリージョン別の発注情報を格納します。
db.purchaseorders.insertMany( [ { _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") }, { _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") }, { _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") }, { _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") }, { _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") }, { _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") }, ] )
別のサンプルコレクション reportedsales
には、四半期およびリージョンごとに報告された売上情報を格納します。
db.reportedsales.insertMany( [ { _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") }, { _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") }, { _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") }, { _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") }, ] )
レポート作成の目的で、次の形式で四半期ごとにデータを表示するとします。
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 } { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
$merge
を使用してpurchaseorders
コレクションの結果とreportedsales
コレクションの結果をマージし、新しいコレクションquarterlyreport
を作成できます。
quarterlyreport
コレクションを作成するには、次のパイプラインを使用できます。
db.purchaseorders.aggregate( [ { $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } } ])
- 第 1 ステージ:
$group
ステージでは四半期ごとにグループ化し、$sum
を使用してqty
フィールドを新しいpurchased
フィールドに追加します。以下に例を示します。quarterlyreport
コレクションを作成するには、このパイプラインを次のように使用できます。{ "_id" : "2019Q2", "purchased" : 1700 } { "_id" : "2019Q1", "purchased" : 1200 } - 第 2 ステージ:
$merge
ステージでは、ドキュメントを同じデータベース内のquarterlyreport
コレクションに書き込みます。 ステージがコレクション内で_id
フィールドに一致する 既存のドキュメントを見つけた場合、ステージは一致するドキュメントをマージします。 それ以外の場合、 ステージはドキュメントを挿入します。 最初の作成では、一致するドキュメントはありません。
コレクション内の文書を表示するには、以下の操作を実行します。
db.quarterlyreport.find().sort( { _id: 1 } )
コレクションには、次のドキュメントが含まれます。
{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 } { "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }
同様に、reportedsales
コレクションに対して次の集計パイプラインを実行して、売上結果を quarterlyreport
コレクションにマージします。
db.reportedsales.aggregate( [ { $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } } ])
- 第 1 ステージ:
$group
ステージでは四半期ごとにグループ化し、$sum
を使用してqty
フィールドを新しいsales
フィールドに追加します。以下に例を示します。{ "_id" : "2019Q2", "sales" : 500 } { "_id" : "2019Q1", "sales" : 1950 } - 第 2 ステージ:
$merge
ステージでは、ドキュメントを同じデータベース内のquarterlyreport
コレクションに書き込みます。ステージがコレクション内で_id
フィールド(四半期)に一致する既存のドキュメントを見つけた場合、ステージは一致するドキュメントをマージします。それ以外の場合、ステージはドキュメントを挿入します。
データがマージされた後に quarterlyreport
コレクション内のドキュメントを表示するには、次の操作を実行します。
db.quarterlyreport.find().sort( { _id: 1 } )
コレクションには、次のドキュメントが含まれます。
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 } { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
パイプラインを使用したマージのカスタマイズ
$merge
は、ドキュメントが一致する場合にカスタム更新パイプラインを使用できます。whenMatched パイプラインには次のステージがあります。
$addFields
およびそのエイリアス$set
$replaceRoot
およびそのエイリアス$replaceWith
サンプルコレクション votes
には、毎日の投票集計を格納します。次のドキュメントを使用してコレクションを作成します。
db.votes.insertMany( [ { date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 }, { date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 }, { date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 }, { date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 }, { date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 }, { date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 } ] )
別の例のコレクションmonthlytotals
には、最新の月間投票総数が含まれています。次のドキュメントを使用してコレクションを作成します。
db.monthlytotals.insertOne( { "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 } )
毎日の終わりに、その日の投票が votes
コレクションに挿入されます。
db.votes.insertOne( { date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 } )
カスタム パイプラインで$merge
を使用して、コレクションmonthlytotals
内の既存のドキュメントを更新できます。
db.votes.aggregate([ { $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } }, { $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } }, { $merge: { into: "monthlytotals", on: "_id", whenMatched: [ { $addFields: { thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] }, thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] } } } ], whenNotMatched: "insert" } } ])
- 第 1 ステージ:
$match
ステージでは、特定の日の投票を検索します。以下に例を示します。{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 } - 第 2 ステージ:
$project
ステージでは、_id
フィールドを年月の文字列に設定します。以下に例を示します。{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" } - 第 3 ステージ:
$merge
ステージでは、ドキュメントを同じデータベース内のmonthlytotals
コレクションに書き込みます。ステージがコレクション内で_id
フィールドに一致する既存のドキュメントを見つけた場合、ステージはパイプラインを使用してthumbsup
票とthumbsdown
票を追加します。このパイプラインは、結果ドキュメントのフィールドに直接アクセスすることはできません。結果ドキュメントの
thumbsup
フィールドとthumbsdown
フィールドにアクセスするために、パイプラインは$$new
変数(つまり、$$new.thumbsup
と$new.thumbsdown
)を使用します。このパイプラインは、コレクション内の既存のドキュメントの
thumbsup
フィールドとthumbsdown
フィールド、つまり$thumbsup
と$thumbsdown
に直接アクセスできます。
結果のドキュメントによって、既存のドキュメントが置き換えられます。
マージ操作後に monthlytotals
コレクション内のドキュメントを表示するには、次の操作を実行します。
db.monthlytotals.find()
コレクションには、次のドキュメントが含まれます。
{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }
変数を使用したマージのカスタマイズ
$merge
ステージのwhenMatchedフィールドで変数を使用できます。変数は、使用する前に定義する必要があります。
次のいずれかまたは両方で変数を定義します。
whenMatched で変数を使用するには次のようにします。
二重ドル記号($$)プレフィックスと変数名を $$<variable_name>
形式で指定します。たとえば、$$year
のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field>
という形式でフィールドを含めることもできます。たとえば、$$year.month
のような形式です。
以下のタブは、変数を mergeステージ、aggregate コマンド、またはその両方で定義したときの動作を示します。
まージ ステージで定義された変数の使用
$merge
ステージレットで変数を定義し、 whenMatchedフィールドの変数を使用できます。
例:
db.cakeSales.insertOne( [ { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ] ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), let : { year: "2020" }, whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {} } ) db.cakeSales.find()
例:
次の名称のコレクションを作成します:
cakeSales
$merge
レットでyear
変数を定義するaggregate
コマンドを実行し、whenMatchedを使用して年数をcakeSales
に追加しますretrieves the
cakeSales
document
出力:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580, "salesTrend" : "up", "salesYear" : "2020" }
集計コマンドで定義された変数の使用
バージョン 5.0 で追加
aggregate
コマンドレットで変数を定義し、 $merge
ステージのwhenMatchedフィールドで変数を使用できます。
例:
db.cakeSales.insertOne( { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2020" } } ) db.cakeSales.find()
例:
次の名称のコレクションを作成します:
cakeSales
aggregate
コマンドletのyear
変数を定義するaggregate
コマンドを実行しwhenMatchedを使用してcakeSales
に年数を追加しますretrieves the
cakeSales
document
出力:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580, "salesTrend" : "up", "salesYear" : "2020" }
マージ ステージと集計コマンドで定義された変数を使用する
変数は、 $merge
ステージで定義できます。また、MongoDB 5.0以降では、 aggregate
コマンドでも定義できます。
$merge
ステージとaggregate
コマンドで同じ名前の 2 つの変数が定義されている場合は、 $merge
ステージ変数が使用されます。
この例では、year: "2020"
$merge
ステージ変数がyear: "2019"
aggregate
コマンド変数の代わりに使用されています。
db.cakeSales.insertOne( { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), let : { year: "2020" }, whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2019" } } ) db.cakeSales.find()
出力:
{ _id: 1, flavor: 'chocolate', salesTotal: 1580, salesTrend: 'up', salesYear: '2020' }