$merge(聚合)
定义
注意
本页介绍了 $merge
阶段,该阶段将聚合管道结果输出到集合中。有关将多个文档合并为单个文档的 $mergeObjects
操作符,请参阅 $mergeObjects
。
$merge
将聚合管道的结果写入指定的集合。
$merge
操作符必须是管道的最后一个阶段。$merge
阶段:可以输出到相同或不同数据库中的集合。
可以输出到正在聚合的同一集合。有关更多信息,请参阅输出到正在聚合的同一集合。。
在 聚合管道使用
$merge
$out
阶段时,请考虑以下几点:从MongoDB 5.0 开始,如果集群中的所有节点都将 featureCompatibilityVersion 设置为
5.0
或更高,且读取偏好允许读取从节点,那么具有$merge
阶段的管道就可以在副本集从节点上运行。在早期的 MongoDB 版本中,具有
$out
或$merge
阶段的管道始终在主节点上运行,并且不考虑读取偏好。
如果输出集合不存在,则创建一个新集合。
可将结果(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)并入现有集合。
可输出到分片集合。输入集合也可以是分片的。
有关与同样将聚合结果输出到集合中的
$out
阶段的比较,请参阅$merge
和$out
比较。
兼容性
可以使用 $merge
查找托管在以下环境中的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
语法
$merge
通过以下语法实现:
{ $merge: { into: <collection> -or- { db: <db>, coll: <collection> }, on: <identifier field> -or- [ <identifier field1>, ...], // Optional let: <variables>, // Optional whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional whenNotMatched: <insert|discard|fail> // Optional } }
例如:
{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
如果使用了 $merge
的所有默认选项(包括写入同一数据库中的集合),则可使用简化形式:
{ $merge: <collection> } // Output collection is in the same database
$merge
接受包含以下字段的文档:
字段 | 说明 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
可选。充当文档唯一标识符的一个或多个字段。标识符决定结果文档是否与输出集合中的现有文档相匹配。指定以下任一项:
对于指定的一个或多个字段:
on 的默认值取决于输出集合:
| |||||||||||
可选。 您可以指定以下任一项:
| |||||||||||
可选。指定在 WhenMatched 管道 中使用的变量。 指定包含变量名和值表达式的文档:
如果未指定,默认为 要访问 whenMatched 管道中的变量,请执行以下操作: 以 有关示例,请参阅使用变量自定义合并。 | |||||||||||
Considerations
_id
字段生成
如果聚合管道结果中的文档中不存在 _id
字段,则 $merge
阶段会自动生成该字段。
例如,在以下聚合管道中,$project
从传递给 $merge
的文档中排除 _id
字段。当 $merge
将这些文档写入 "newCollection"
时,$merge
会生成一个新的 _id
字段和值。
db.sales.aggregate( [ { $project: { _id: 0 } }, { $merge : { into : "newCollection" } } ] )
如果输出集合不存在,则创建一个新集合
如果指定的输出集合不存在,则 $merge
操作会创建一个新集合。
如果输出集合不存在,$merge
要求将 on 标识符作为 _id
字段。要为不存在的集合使用不同的 on
字段值,可以先在所需字段上创建唯一索引来创建集合。例如,如果输出集合 newDailySales201905
不存在,而您想指定 salesDate
字段作为 on 标识符:
db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } ) db.sales.aggregate( [ { $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } }, { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } }, { $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } }, { $merge : { into : "newDailySales201905", on: "salesDate" } } ] )
输出到分片集合
$merge
阶段可输出到分片集合。对输出集合进行分片时,$merge
使用 _id
字段和所有分片键字段作为默认 on 标识符。如果覆盖此默认值,on 标识符则须包含所有分片键字段:
{ $merge: { into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" }, on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields let: <variables>, // Optional whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional whenNotMatched: <insert|discard|fail> // Optional } }
例如,使用 sh.shardCollection()
方法创建新的分片集合 newrestaurants
,其中 postcode
字段作为分片键。
sh.shardCollection( "exampledb.newrestaurants", // Namespace of the collection to shard { postcode: 1 }, // Shard key );
newrestaurants
集合将包含按月份(date
字段)和邮政编码(分片键)列出的新餐厅开业信息的文档;具体来说,on 标识符是 ["date", "postcode"]
(字段的顺序无关紧要)。由于 $merge
需要唯一索引,并且具有对应 on 标识符字段的键,因此请创建唯一索引(字段的顺序无关紧要):[1]
use exampledb db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )
通过创建分片集合 restaurants
和唯一索引后,可以使用 $merge
将聚合结果输出到此集合,匹配 [ "date", "postcode" ]
,如下例所示:
use exampledb db.openings.aggregate([ { $group: { _id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" }, restaurants: { $push: "$restaurantName" } } }, { $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } }, { $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } } ])
[1] | 在传递 { unique: true
} 选项时,sh.shardCollection() 方法还可以在分片键上创建唯一索引,前提是:分片键基于范围,集合为空,并且分片键上的唯一索引尚不存在。在前面的示例中,由于 on 标识符是分片键和另一个字段,因此需要单独的操作来创建相应的索引。 |
替换文档 ($merge
) 与替换集合 ($out
)
$merge
可以替换输出集合中的现有文档,前提是聚合结果包含根据 on 规范匹配的一个或多个文档。因此,如果聚合结果包括集合中所有现有文档的匹配文档,并且您为 whenMatched 指定“replace”,则 $merge
可以替换现有集合中的所有文档。
但是,在不考虑聚合结果的情况下,如果要替换现有集合,请使用 $out
。
现有文档和 _id
以及分片键值
唯一索引约束
如果 $merge
为 on 字段使用的唯一索引在聚合过程中被删除,则无法保证会终止聚合。如果继续聚合,则无法保证文档中没有重复的 on
字段值。
如果 $merge
尝试写入的文档违反输出集合上的任何唯一索引,则操作会产生错误。例如:
模式验证
如果您的集合使用模式验证并将 validationAction
设置为 error
,则插入无效文档或使用 $merge
更新具有无效值的文档会抛出 MongoServerError
,并且该文档不会写入目标集合。如果有多个无效文档,则只有出现的第一个无效文档会引发错误。所有有效文档都写入目标集合,所有无效文档都会写入失败。
whenMatched
管道行为
对于 $merge
阶段,如果以下条件全部为真,则 $merge
将文档直接插入到输出集合:
whenMatched 的值是一个聚合管道,
whenNotMatched 的值为
insert
,并且输出集合中没有匹配的文档,
$merge
将文档直接插入到输出集合中。
$merge
和 $out
比较
随着 $merge
的引入,MongoDB 提供两个阶段,即 $merge
和 $out
,用于将聚合管道的结果写入集合:
|
|
|
|
|
|
|
|
|
|
输出到正在聚合的同一集合
警告
限制
限制 | 说明 |
---|---|
聚合管道不能使用 $merge 输出到时间序列集合。 | |
Separate from materialized view | |
$lookup 阶段 | |
$facet 阶段 | |
$unionWith 阶段 | |
"linearizable" 读关注 (read concern) |
|
示例
按需物化视图:初始创建
如果输出集合不存在,则 $merge
会创建该集合。
例如,zoo
数据库中的 salaries
集合填充有员工工资和部门历史:
db.getSiblingDB("zoo").salaries.insertMany([ { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 } ])
您可以使用 $group
和 $merge
阶段从当前位于 salaries
集合中的数据初始创建名为 budgets
的集合(在 reporting
数据库中):
注意
对于副本集或独立部署,如果输出数据库不存在,则 $merge
也会创建该数据库。
对于分片集群部署,指定输出数据库必须已经存在。
db.getSiblingDB("zoo").salaries.aggregate( [ { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } }, { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
要查看新 budgets
集合中的文档:
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
budgets
集合包含以下文档:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 } { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 } { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 } { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 } { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 } { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
按需物化视图:更新/替换数据
以下示例将使用上一示例中的集合。
示例 salaries
集合包含员工工资和部门历史:
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
示例 budgets
集合包含年度累积预算:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 } { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 } { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 } { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 } { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 } { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
在本财政年度(本例中为 2019
),salaries
集合中添加了新员工,并为下一年度预分配了新员工人数:
db.getSiblingDB("zoo").salaries.insertMany([ { "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 }, { "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 }, { "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 }, { "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 } ])
要更新 budgets
集合以反映新的工资信息,使用以下聚合管道:
$match
阶段,查找fiscal_year
大于或等于2019
的所有文档:$group
阶段以按fiscal_year
和dept
对这些工资进行分组。$merge
将结果集写入budgets
集合,并替换文档,被替换文档具有相同的_id
值(在本示例中,是具有财政年度和部门的文档)。对于集合中没有匹配项的文档,$merge
会插入新文档。
db.getSiblingDB("zoo").salaries.aggregate( [ { $match : { fiscal_year: { $gte : 2019 } } }, { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } }, { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
聚合运行后,查看 budgets
集合中的文档:
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
budgets
集合包含 2019 财年的新薪资数据,并添加 2020 财年的新文档:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 } { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 } { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 } { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 } { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 } { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 } { "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }
仅插入新数据
为确保 $merge
不会覆盖集合中的现有数据,请将 whenMatched 设置为 keepExisting 或 fail。
zoo
数据库中的示例 salaries
集合包含员工薪资和部门历史记录:
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
reporting
数据库中的集合 orgArchive
包含过去财政年度的部门组织历史记录。不得修改存档记录。
{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 } { "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 } { "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 } { "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
orgArchive
集合具有针对 fiscal_year
和 dept
字段的唯一复合索引。具体而言,同一财政年度与部门的组合最多应有一条记录:
db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )
在当前财经年度结束时(本例中为 2019
),salaries
集合包含以下文档:
{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 } { "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 } { "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 } { "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 } { "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 } { "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 } { "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 } { "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 } { "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 } { "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 } { "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 } { "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 } { "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 } { "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
要更新 orgArchive
集合以包含刚刚结束的财政年度 2019
,请使用以下聚合管道:
$match
阶段,查找fiscal_year
等于2019
的所有文档。$group
阶段以按fiscal_year
和dept
对这些员工进行分组。$project
阶段抑制_id
字段并添加单独的dept
和fiscal_year
字段。将文档传递给$merge
时,$merge
会自动为这些文档生成新的_id
字段。$merge
将结果集写入orgArchive
。$merge
阶段基于dept
和fiscal_year
字段匹配文档,并且在匹配时会fails
。也就是说,如果同一部门和财政年度的文档已经存在,则发生$merge
错误。
db.getSiblingDB("zoo").salaries.aggregate( [ { $match: { fiscal_year: 2019 }}, { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } }, { $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } }, { $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } } ] )
操作完成后,orgArchive
集合包含以下文档:
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 } { "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 } { "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 } { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 } { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 } { "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }
如果 orgArchive
集合已经包含部门 "A"
和/或 "B"
的 2019 年文档,则由于重复密钥错误,聚合会失败。但是,在错误发生之前插入的任何文档都不会回滚。
如果为匹配文档指定 keepExisting,则聚合不会影响匹配文档,也不会出现重复键错误。同样,如果指定 replace,操作不会失败,但会替换现有文档。
合并多个集合的结果
默认情况下,如果聚合结果中的文档与集合中的文档相匹配,$merge
阶段就会合并文档。
示例集合 purchaseorders
按季度和区域填充了采购订单信息:
db.purchaseorders.insertMany( [ { _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") }, { _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") }, { _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") }, { _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") }, { _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") }, { _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") }, ] )
另一示例集合 reportedsales
则包含按季度和地区报告的销售信息:
db.reportedsales.insertMany( [ { _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") }, { _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") }, { _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") }, { _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") }, ] )
假设出于报告目的,您希望根据以下格式按季度查看数据:
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 } { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
您可以使用 $merge
来合并来自 purchaseorders
集合和 reportedsales
集合的结果,从而创建一个新的集合 quarterlyreport
。
要创建 quarterlyreport
集合,可以使用以下管道:
db.purchaseorders.aggregate( [ { $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } } ])
- 第一个阶段:
$group
阶段按季度分组,并使用$sum
将qty
字段添加到新的purchased
字段中。例如:要创建
quarterlyreport
集合,您可以使用此管道:{ "_id" : "2019Q2", "purchased" : 1700 } { "_id" : "2019Q1", "purchased" : 1200 } - 第二阶段:
$merge
阶段会将这些文档写入同一数据库中的quarterlyreport
集合。如果此阶段在集合中找到与_id
字段匹配的现有文档,此阶段则会合并这些匹配的文档。否则,该阶段会插入文档。对于初始创建,所有文档都不应匹配。
要查看集合中的文档,运行以下操作:
db.quarterlyreport.find().sort( { _id: 1 } )
该集合包含以下文档:
{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 } { "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }
同样,针对 reportedsales
集合运行以下聚合管道,将销售结果合并到 quarterlyreport
集合中。
db.reportedsales.aggregate( [ { $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } } ])
- 第一个阶段:
$group
阶段按季度分组,并使用$sum
将qty
字段添加到新的sales
字段中。例如:{ "_id" : "2019Q2", "sales" : 500 } { "_id" : "2019Q1", "sales" : 1950 } - 第二阶段:
$merge
阶段将这些文档写入同一数据库中的quarterlyreport
集合。如果该阶段在集合中找到与_id
字段(季度)匹配的现有文档,则该阶段会合并匹配文档。否则,该阶段会插入文档。
要在合并数据后查看 quarterlyreport
集合中的文档,请运行以下操作:
db.quarterlyreport.find().sort( { _id: 1 } )
该集合包含以下文档:
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 } { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
使用管道自定义合并
当文档匹配时,$merge
可以使用自定义更新管道。whenMatched 管道可以包含以下阶段:
$addFields
及其别名$set
$replaceRoot
及其别名$replaceWith
示例集合 votes
包含每日投票记录。创建包含以下文档的集合:
db.votes.insertMany( [ { date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 }, { date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 }, { date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 }, { date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 }, { date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 }, { date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 } ] )
另一个示例集合 monthlytotals
包含最新的每月投票总数。使用以下文档创建集合:
db.monthlytotals.insertOne( { "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 } )
每天结束时,将当天的选票插入 votes
集合中:
db.votes.insertOne( { date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 } )
可以将 $merge
与自定义管道一起使用来更新集合 monthlytotals
中的现有文档:
db.votes.aggregate([ { $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } }, { $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } }, { $merge: { into: "monthlytotals", on: "_id", whenMatched: [ { $addFields: { thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] }, thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] } } } ], whenNotMatched: "insert" } } ])
- 第一个阶段:
$match
阶段会查找特定日期的投票。例如:{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 } - 第二阶段:
$project
阶段将_id
字段设置为年月字符串。例如:{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" } - 第三阶段:
$merge
阶段会将这些文档写入同一数据库中的monthlytotals
集合。如果该阶段在集合中找到与_id
字段匹配的现有文档,则该阶段将使用管道添加thumbsup
投票和thumbsdown
投票。该管道不能直接访问结果文档中的字段。为了访问结果文档中的
thumbsup
字段和thumbsdown
字段,该管道使用$$new
变量;即$$new.thumbsup
和$new.thumbsdown
。该管道可直接访问集合中现有文档内的
thumbsup
和thumbsdown
字段;即$thumbsup
和$thumbsdown
。
生成的文档将替换现有文档。
要在合并操作后查看 monthlytotals
集合中的文档,请运行以下操作:
db.monthlytotals.find()
集合包含以下文档:
{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }
使用变量自定义合并
您可以在 $merge
阶段 whenMatched 字段中使用变量。必须先定义变量,然后才能使用。
使用以下一种或两种方式定义变量:
要在 whenMatched 中使用变量,请执行以下操作:
以 $$<variable_name>
形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year
。如果将此变量设为文档,则还能以 $$<variable_name>.<field>
形式包含文档字段。例如,$$year.month
。
下面的标签页展示在合并阶段、聚合命令或两者中定义变量时的行为。
使用在合并阶段定义的变量
您可以在 $merge
阶段 let 中定义变量,并在 whenMatched 字段中使用变量。
示例:
db.cakeSales.insertOne( [ { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ] ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), let : { year: "2020" }, whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {} } ) db.cakeSales.find()
示例:
创建一个集合,名为
cakeSales
运行
aggregate
命令,在$merge
let 中定义year
变量,并使用 whenMatched 将年份添加到cakeSales
中。retrieves the
cakeSales
document
输出:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580, "salesTrend" : "up", "salesYear" : "2020" }
使用聚合命令中定义的变量
版本 5.0 中的新增功能。
您可以在 aggregate
命令 let 中定义变量,并在 $merge
阶段 whenMatched 字段中使用变量。
示例:
db.cakeSales.insertOne( { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2020" } } ) db.cakeSales.find()
示例:
创建一个集合,名为
cakeSales
运行
aggregate
命令,该命令会在aggregate
命令 let 中定义year
变量,并使用 whenMatched 将年份添加到cakeSales
中retrieves the
cakeSales
document
输出:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580, "salesTrend" : "up", "salesYear" : "2020" }
使用在合并阶段和聚合命令中定义的变量
您可以在 $merge
阶段定义变量,从 MongoDB 5.0 开始,还可以使用 aggregate
命令。
如果在 $merge
阶段和 aggregate
命令中定义了两个同名变量,则使用 $merge
阶段变量。
此示例中使用的是 year: "2020"
$merge
阶段变量,而不是 year: "2019"
aggregate
命令变量:
db.cakeSales.insertOne( { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), let : { year: "2020" }, whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2019" } } ) db.cakeSales.find()
输出:
{ _id: 1, flavor: 'chocolate', salesTotal: 1580, salesTrend: 'up', salesYear: '2020' }