Docs 菜单
Docs 主页
/
MongoDB Manual
/ / /

$merge(聚合)

在此页面上

  • 定义
  • 兼容性
  • 语法
  • 注意事项
  • 限制
  • 举例

注意

本页介绍了 $merge阶段,该阶段将聚合管道结果输出到集合中。有关将多个文档合并为单个文档的 $mergeObjects操作符,请参阅$mergeObjects

$merge

聚合管道的结果写入指定的集合。$merge 操作符必须是管道的最后一个阶段。

$merge 阶段:

  • 可以输出到相同或不同数据库中的集合。

  • 可以输出到正在聚合的同一集合。有关更多信息,请参阅输出到正在聚合的同一集合。

  • 聚合管道$merge$out 中使用 或 阶段时,请考虑以下几点:

    • 从 MongoDB5 0开始。$merge ,如果集群中所有节点都将 featureCompatibilityVersion 设置为5.0 或更高,并且 读取偏好 允许从节点 读取 ,则具有 阶段的管道可以在副本集从节点上运行。

      • $merge$out阶段在从节点上运行,但写入操作会发送到节点。

      • 并非所有驱动程序版本都支持发送到从节点的$merge操作。有关详细信息,请参阅驱动程序文档。

    • 在早期 MongoDB 版本中,具有$out$merge阶段的管道始终在主节点上运行,并且不考虑读取偏好。

  • 如果输出集合不存在,则创建一个新集合。

  • 可将结果(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)并入现有集合。

  • 可输出到分片集合。输入集合也可以是分片的。

有关与同样将聚合结果输出到集合中的 $out 阶段的比较,请参阅 $merge$out 比较

注意

按需物化视图

$merge 可以将管道结果纳入现有的输出集合,而不是执行对集合完全替换。此功能允许用户创建按需物化视图,在管道运行时增量更新输出集合的内容。

有关此用例的更多信息,请参阅《按需物化视图》以及本页上的示例。

物化视图与只读视图是分开的。有关创建只读视图的信息,请参阅只读视图。

可以使用 $merge 查找托管在以下环境中的部署:

  • MongoDB Atlas :用于在云中部署 MongoDB 的完全托管服务

$merge 通过以下语法实现:

{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

例如:

{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

如果使用了 $merge 的所有默认选项(包括写入同一数据库中的集合),则可使用简化形式:

{ $merge: <collection> } // Output collection is in the same database

$merge 接受包含以下字段的文档:

字段
说明

输出集合。指定以下任一项:

  • 将集合名称作为字符串输出到运行聚合的同一数据库中的集合。例如:

    into: "myOutput"

  • 文档中的数据库和集合名称,以输出到指定数据库中的集合。例如:

    into: { db:"myDB", coll:"myOutput" }

注意

  • 如果输出集合不存在,$merge 将创建集合:

    • 对于副本集或独立运行的实例,如果输出数据库不存在,$merge 也会创建该数据库。

    • 对于分片集群,指定的输出数据库必须已经存在。

  • 输出集合可以是分片集合。

可选。充当文档唯一标识符的一个或多个字段。标识符决定结果文档是否与输出集合中的现有文档相匹配。指定以下任一项:

  • 字符串形式的单个字段名称。例如:

    on: "_id"

  • 数组中的字段组合。例如:

    on: [ "date", "customerId" ]
    The order of the fields in the array does not matter, and you cannot specify the same field multiple times.

对于指定的一个或多个字段:

  • 聚合结果文档必须包含 on 中指定的字段,除非 on 字段是 _id 字段。如果结果文档中缺少 _id 字段,MongoDB 会自动添加。

  • 指定的一个或多个字段不能包含 null 或数组值。

$merge 需要一个唯一索引,其中包含对应于 on 标识符字段的键。尽管索引键规范的顺序并不重要,但唯一索引必须仅包含 on 字段作为其键。

  • 索引必须与聚合具有相同的排序规则

  • 唯一索引可为稀疏索引。

  • 唯一索引不能为部分索引。

  • 对于已存在的输出集合,相应的索引必须已存在。

on 的默认值取决于输出集合:

  • 如果输出集合不存在,则 on 标识符必须为且默认为 _id 字段。系统会自动创建相应的唯一 _id 索引。

    提示

    要为不存在的集合使用不同的 on 标识符字段,可以先在所需字段上创建唯一索引来创建集合。有关示例,请参阅关于不存在的输出集合的部分

  • 如果现有输出集合未进行分片,on 标识符则默认为 _id 字段。

  • 如果现有输出集合为分片集合,则 on 标识符默认为所有分片键字段和 _id 字段。如果指定了不同的 on 标识符,则 on 必须包含所有分片键字段。

可选。$merge 的行为(如果结果文档和集合中的现有文档针对指定 on 字段的值相同)。

您可以指定以下任一项:

  • 预定义的动作字符串之一:

    操作
    说明

    输出集合中的现有文档替换为匹配的结果文档。

    执行替换时,替换文档不能导致修改 _id 值,或者,如果输出集合是分片的,则不能修改分片键值。否则,操作将产生错误。

    提示

    为避免出现此错误,如果 on 字段不包括 _id 字段,则应删除聚合结果中的 _id 字段,以避免出现错误,如前面的 $unset 阶段等。

    输出集合中保留现有文档。

    "merge"(默认)

    合并匹配文档(类似于 $mergeObjects 操作符)。

    • 如果结果文档包含不在现有文档中的字段,请将这些新字段添加到现有文档中。

    • 如果结果文档包含现有文档中的字段,则将现有的字段值替换为来自结果文件的值。

    例如,如果输出集合中有以下文档:

    { _id: 1, a: 1, b: 1 }

    且聚合结果中以下文档:

    { _id: 1, b: 5, z: 1 }

    则合并文档为:

    { _id: 1, a: 1, b: 5, z: 1 }

    执行合并时,合并的文档不会导致分片键值(如果输出集合已分片)或 _id 值发生修改。否则,操作将产生错误。

    提示

    为避免出现此错误,如果 on 字段不包括 _id 字段,则应删除聚合结果中的 _id 字段,以避免出现错误,如前面的 $unset 阶段等。

    停止聚合操作并使其失败。先前文档对输出集合的任何更改都不会恢复。

  • 用于更新集合中文档的聚合管道。

    [ <stage1>, <stage2> ... ]

    该管道只能由以下阶段组成:

    该管道无法修改 on 字段的值。例如,如果您要对 month 字段进行匹配,则该管道无法修改 month 字段。

    whenMatched pipeline 可以使用 $<field> 直接访问输出集合中现有文档的字段。

    要访问聚合结果文档中的字段,请使用以下任一项:

    • 用于访问字段的内置 $$new 变量。具体来说,为 $$new.<field>$$new 变量仅在省略 let 规范时才可用。

      注意

      从 MongoDB 4.2.2 开始,$$new 变量被保留,并且不能被覆盖。

    • let 字段中的用户自定义变量。

      $$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month

      有关更多示例,请参阅使用变量自定义合并

可选。指定在 WhenMatched 管道 中使用的变量。

指定包含变量名和值表达式的文档:

{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }

如果未指定,默认为 { new: "$$ROOT" }(请参阅 ROOT)。whenMatched 管道可以访问 $$new 变量。

注意

从 MongoDB 4.2.2 开始,$$new 变量被保留,并且不能被覆盖。

要访问 whenMatched 管道中的变量,请执行以下操作:

$$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month

有关示例,请参阅使用变量自定义合并

可选。$merge 的行为(如果结果文档与输出集合中的现有文档不匹配)。

您可以指定以下预定义的动作字符串之一:

操作
说明
"insert"(默认)

将此文档插入输出集合。

丢弃文档。具体来说,$merge 不会将文档插入到输出集合。

停止聚合操作并使其失败。已写入输出集合的所有更改均不会恢复。

如果聚合管道结果中的文档中不存在 _id 字段,则 $merge 阶段会自动生成该字段。

例如,在以下聚合管道中,$project 从传递给 $merge 的文档中排除 _id 字段。当 $merge 将这些文档写入 "newCollection" 时,$merge 会生成一个新的 _id 字段和值。

db.sales.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )

如果指定的输出集合不存在,则 $merge 操作会创建一个新集合。

  • $merge 将第一个文档写入集合时,输出集合就已创建,且立即可见。

  • 如果聚合失败,则在错误发生之前,$merge 完成的任何写入都不会回滚。

注意

对于副本集或独立运行的实例,如果输出数据库不存在,$merge 也会创建该数据库。

对于分片集群,指定的输出数据库必须已经存在。

如果输出集合不存在,$merge 要求将 on 标识符作为 _id 字段。要为不存在的集合使用不同的 on 字段值,可以先在所需字段上创建唯一索引来创建集合。例如,如果输出集合 newDailySales201905 不存在,而您想指定 salesDate 字段作为 on 标识符:

db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
db.sales.aggregate( [
{ $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
{ $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
{ $merge : { into : "newDailySales201905", on: "salesDate" } }
] )

$merge 阶段可输出到分片集合。对输出集合进行分片时,$merge 使用 _id 字段和所有分片键字段作为默认 on 标识符。如果覆盖此默认值,on 标识符则须包含所有分片键字段:

{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

例如,在具有sharding enabled的数据库中,使用sh.shardCollection()方法创建新的分片集合newrestaurants ,其中postcode字段作为分片键。

sh.enableSharding("exampledb"); // Sharding must be enabled in the database
sh.shardCollection(
"exampledb.newrestaurants", // Namespace of the collection to shard
{ postcode: 1 }, // Shard key
);

newrestaurants 集合将包含按月份(date 字段)和邮政编码(分片键)列出的新餐厅开业信息的文档;具体来说,on 标识符是 ["date", "postcode"](字段的顺序无关紧要)。由于 $merge 需要唯一索引,并且具有对应 on 标识符字段的键,因此请创建唯一索引(字段的顺序无关紧要):[1]

use exampledb
db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )

通过创建分片集合 restaurants 和唯一索引后,可以使用 $merge 将聚合结果输出到此集合,匹配 [ "date", "postcode" ],如下例所示:

use exampledb
db.openings.aggregate([
{ $group: {
_id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
restaurants: { $push: "$restaurantName" } } },
{ $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
{ $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
])
[1] 在传递 { unique: true } 选项时,sh.shardCollection() 方法还可以在分片键上创建唯一索引,前提是:分片键基于范围,集合为空,并且分片键上的唯一索引尚不存在。在前面的示例中,由于 on 标识符是分片键和另一个字段,因此需要单独的操作来创建相应的索引。

$merge 可以替换输出集合中的现有文档,前提是聚合结果包含根据 on 规范匹配的一个或多个文档。因此,如果聚合结果包括集合中所有现有文档的匹配文档,并且您为 whenMatched 指定“replace”,则 $merge 可以替换现有集合中的所有文档。

但是,在不考虑聚合结果的情况下,如果要替换现有集合,请使用 $out

如果 $merge 导致现有文档的_id 值发生变化,则出现 $merge 错误。

提示

为避免出现此错误,如果 on 字段不包括 _id 字段,则应删除聚合结果中的 _id 字段,以避免出现错误,如前面的 $unset 阶段等。

此外,对于分片集合,如果 $merge 导致现有文档的分片键值发生变化,则也会产生错误。

在错误发生之前,$merge 完成的任何写入都不会回滚。

如果 $mergeon 字段使用的唯一索引在聚合过程中被删除,则无法保证会终止聚合。如果继续聚合,则无法保证文档中没有重复的 on 字段值。

如果 $merge 尝试写入的文档违反输出集合上的任何唯一索引,则操作会产生错误。例如:

  • 插入一个不匹配的文档,该文档违反了唯一索引(非 on 字段的索引)。

  • 失败,如果集合中有匹配的文档。具体而言,该操作会尝试插入一个违反了 on 字段唯一索引的匹配文档。

  • 将现有文档替换为违反了唯一索引而不是 on 字段的索引的新文档。

  • 合并导致文档违反唯一索引而不是 on 字段的索引的匹配文档。

如果您的集合使用 模式验证 并将validationAction error$merge设置为MongoServerError ,则使用 插入无效文档或更新具有无效值的文档会引发 ,并且文档不会写入目标集合。如果存在多个无效文档,则只有遇到的第一个无效文档才会引发错误。所有有效文档均会写入目标集合,所有无效文档均无法写入。

对于 $merge 阶段,如果以下条件全部为真,则 $merge 将文档直接插入到输出集合:

$merge 将文档直接插入到输出集合中。

在 MongoDB 4.2.2 之前,当满足 $merge 阶段的这些条件时,将使用空的输入文档执行 whenMatched 字段中指定的管道。管道中生成的文档将插入输出集合中。

随着 $merge 的引入,MongoDB 提供两个阶段,即 $merge$out,用于将聚合管道的结果写入集合:

  • 可以输出到相同或不同数据库中的集合。

  • 可以输出到相同或不同数据库中的集合。

  • 如果输出集合不存在,则创建一个新集合。

  • 如果输出集合不存在,则创建一个新集合。

  • 完全替换已存在的输出集合。

  • 可输出到分片集合。输入集合也可以是分片的。

  • 无法输出到分片集合。但是,可以对输入集合进行分片。

  • 对应 SQL 语句:

    • MERGE

    • INSERT INTO T2 SELECT FROM T1

    • SELECT INTO T2 FROM T1

    • 创建/刷新物化视图。

  • 对应 SQL 语句:

    • INSERT INTO T2 SELECT FROM T1

    • SELECT INTO T2 FROM T1

警告

$merge输出到正在聚合的同一集合时,文档可能会被多次更新,或者操作可能会导致无限循环。 当$merge执行的更新更改了磁盘上存储的文档的物理位置时,就会出现此行为。 当文档的物理位置发生变化时, $merge可能会将其视为全新文档,从而进行更多更新。 有关此行为的更多信息,请参阅 万圣节问题。

$merge 可以输出到正在聚合的同一集合。此外,也可以输出到管道的其他阶段中出现的集合,例如 $lookup

限制
说明
聚合管道不能在事务中使用 $merge
聚合管道不能使用 $merge 输出到时间序列集合。
Separate from materialized view
视图定义不能包含$merge阶段。如果视图定义包含嵌套管道(例如,视图定义包含$facet阶段),则此$merge阶段限制也适用于嵌套管道。
$lookup 阶段
$lookup 阶段的 嵌套管道不能包括 $merge 阶段。
$facet 阶段
$facet 阶段的 嵌套管道不能包括 $merge 阶段。
$unionWith 阶段
$unionWith 阶段的 嵌套管道不能包括 $merge 阶段。
"linearizable" 读关注 (read concern)

$merge 阶段不能与读关注 "linearizable" 一起使用。也就是说,如果您为 db.collection.aggregate() 指定 "linearizable" 读关注,则不能将 $merge 阶段包括在管道中。

如果输出集合不存在,则 $merge 会创建该集合。

例如,zoo 数据库中的 salaries 集合填充有员工工资和部门历史:

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
])

您可以使用 $group$merge 阶段从当前位于 salaries 集合中的数据初始创建名为 budgets 的集合(在 reporting 数据库中):

注意

对于副本集或独立部署,如果输出数据库不存在,则 $merge 也会创建该数据库。

对于分片集群部署,指定输出数据库必须已经存在。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
  • $group 阶段以按 fiscal_yeardept 对这些工资进行分组。

  • $merge 阶段将前一个 $group 阶段的输出写入 reporting 数据库中的 budgets 集合。

要查看新 budgets 集合中的文档:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

budgets 集合包含以下文档:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

提示

另请参阅:

以下示例将使用上一示例中的集合。

示例 salaries 集合包含员工工资和部门历史:

{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

示例 budgets 集合包含年度累积预算:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

在本财政年度(本例中为 2019),salaries 集合中添加了新员工,并为下一年度预分配了新员工人数:

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
{ "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
{ "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
{ "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
])

要更新 budgets 集合以反映新的工资信息,使用以下聚合管道:

  • $match 阶段,查找 fiscal_year 大于或等于 2019 的所有文档:

  • $group 阶段以按 fiscal_yeardept 对这些工资进行分组。

  • $merge 将结果集写入 budgets 集合,并替换文档,被替换文档具有相同的 _id 值(在本示例中,是具有财政年度和部门的文档)。对于集合中没有匹配项的文档,$merge 会插入新文档。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match : { fiscal_year: { $gte : 2019 } } },
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

聚合运行后,查看 budgets 集合中的文档:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

budgets 集合包含 2019 财年的新薪资数据,并添加 2020 财年的新文档:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }

提示

另请参阅:

为确保 $merge 不会覆盖集合中的现有数据,请将 whenMatched 设置为 keepExistingfail

zoo 数据库中的示例 salaries 集合包含员工薪资和部门历史记录:

{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

reporting 数据库中的集合 orgArchive 包含过去财政年度的部门组织历史记录。不得修改存档记录。

{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }

orgArchive集合具有针对 fiscal_yeardept 字段的唯一复合索引。具体而言,同一财政年度与部门的组合最多应有一条记录:

db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )

在当前财经年度结束时(本例中为 2019),salaries 集合包含以下文档:

{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
{ "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
{ "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
{ "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
{ "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
{ "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
{ "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
{ "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
{ "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
{ "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
{ "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
{ "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }

要更新 orgArchive 集合以包含刚刚结束的财政年度 2019,请使用以下聚合管道:

  • $match 阶段,查找 fiscal_year 等于 2019 的所有文档。

  • $group 阶段以按 fiscal_yeardept 对这些员工进行分组。

  • $project 阶段抑制 _id 字段并添加单独的 deptfiscal_year 字段。将文档传递给 $merge 时,$merge 会自动为这些文档生成新的 _id 字段。

  • $merge 将结果集写入 orgArchive

    $merge 阶段基于 deptfiscal_year 字段匹配文档,并且在匹配时会 fails。也就是说,如果同一部门和财政年度的文档已经存在,则发生 $merge 错误。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )

操作完成后,orgArchive 集合包含以下文档:

{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }

如果 orgArchive 集合已经包含部门 "A" 和/或 "B" 的 2019 年文档,则由于重复密钥错误,聚合会失败。但是,在错误发生之前插入的任何文档都不会回滚。

如果为匹配文档指定 keepExisting,则聚合不会影响匹配文档,也不会出现重复键错误。同样,如果指定 replace,操作不会失败,但会替换现有文档。

默认情况下,如果聚合结果中的文档与集合中的文档相匹配,$merge 阶段就会合并文档。

示例集合 purchaseorders 按季度和区域填充了采购订单信息:

db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )

另一示例集合 reportedsales 则包含按季度和地区报告的销售信息:

db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )

假设出于报告目的,您希望根据以下格式按季度查看数据:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

您可以使用 $merge 来合并来自 purchaseorders 集合和 reportedsales 集合的结果,从而创建一个新的集合 quarterlyreport

要创建 quarterlyreport 集合,可以使用以下管道:

db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
第一个阶段:

$group 阶段按季度分组,并使用 $sumqty 字段添加到新的 purchased 字段中。例如:

要创建 quarterlyreport 集合,您可以使用此管道:

{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
第二阶段:
$merge 阶段会将这些文档写入同一数据库中的 quarterlyreport 集合。如果此阶段在集合中找到与 _id 字段匹配的现有文档,此阶段则会合并这些匹配的文档。否则,该阶段会插入文档。对于初始创建,所有文档都不应匹配。

要查看集合中的文档,运行以下操作:

db.quarterlyreport.find().sort( { _id: 1 } )

该集合包含以下文档:

{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }

同样,针对 reportedsales 集合运行以下聚合管道,将销售结果合并到 quarterlyreport 集合中。

db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
第一个阶段:

$group 阶段按季度分组,并使用 $sumqty 字段添加到新的 sales 字段中。例如:

{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
第二阶段:
$merge 阶段将这些文档写入同一数据库中的 quarterlyreport 集合。如果该阶段在集合中找到与 _id 字段(季度)匹配的现有文档,则该阶段会合并匹配文档。否则,该阶段会插入文档。

要在合并数据后查看 quarterlyreport 集合中的文档,请运行以下操作:

db.quarterlyreport.find().sort( { _id: 1 } )

该集合包含以下文档:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

当文档匹配时,$merge 可以使用自定义更新管道whenMatched 管道可以包含以下阶段:

示例集合 votes 包含每日投票记录。创建包含以下文档的集合:

db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )

另一个示例集合 monthlytotals 包含最新的每月投票总数。使用以下文档创建集合:

db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)

每天结束时,将当天的选票插入 votes 集合中:

db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)

可以将 $merge 与自定义管道一起使用来更新集合 monthlytotals 中的现有文档:

db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
第一个阶段:

$match 阶段会查找特定日期的投票。例如:

{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
第二阶段:

$project 阶段将 _id 字段设置为年月字符串。例如:

{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
第三阶段:

$merge 阶段会将这些文档写入同一数据库中的 monthlytotals 集合。如果该阶段在集合中找到与 _id 字段匹配的现有文档,则该阶段将使用管道添加 thumbsup 投票和 thumbsdown 投票。

  • 该管道不能直接访问结果文档中的字段。为了访问结果文档中的 thumbsup 字段和 thumbsdown 字段,该管道使用 $$new 变量;即 $$new.thumbsup$new.thumbsdown

  • 该管道可直接访问集合中现有文档内的 thumbsupthumbsdown 字段;即 $thumbsup$thumbsdown

生成的文档将替换现有文档。

要在合并操作后查看 monthlytotals 集合中的文档,请运行以下操作:

db.monthlytotals.find()

集合包含以下文档:

{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }

您可以在 $merge 阶段 whenMatched 字段中使用变量。必须先定义变量,然后才能使用。

使用以下一种或两种方式定义变量:

要在 whenMatched 中使用变量,请执行以下操作:

$$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month

下面的标签页展示在合并阶段、聚合命令或两者中定义变量时的行为。

您可以在 $merge 阶段 let 中定义变量,并在 whenMatched 字段中使用变量。

示例:

db.cakeSales.insertOne( [
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
] )
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {}
} )
db.cakeSales.find()

示例:

  • 创建一个collection,名为 cakeSales

  • 运行 aggregate 命令,在$merge let 中定义 year 变量,并使用 whenMatched 将年份添加到 cakeSales 中。

  • 检索cakeSales文档

输出:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }

版本 5.0 中的新增功能

您可以在 aggregate 命令 let 中定义变量,并在 $merge 阶段 whenMatched 字段中使用变量。

示例:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
whenMatched: [ {
$addFields: { "salesYear": "$$year" } }
] }
}
],
cursor: {},
let : { year: "2020" }
} )
db.cakeSales.find()

示例:

  • 创建一个collection,名为 cakeSales

  • 运行 aggregate 命令,该命令会在 aggregate 命令 let 中定义 year 变量,并使用 whenMatched 将年份添加到 cakeSales

  • 检索cakeSales文档

输出:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }

您可以在 $merge 阶段定义变量,从 MongoDB 5.0 开始,还可以使用 aggregate 命令。

如果在 $merge 阶段和 aggregate 命令中定义了两个同名变量,则使用 $merge 阶段变量。

此示例中使用的是 year: "2020" $merge 阶段变量,而不是 year: "2019" aggregate 命令变量:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2019" }
} )
db.cakeSales.find()

输出:

{
_id: 1,
flavor: 'chocolate',
salesTotal: 1580,
salesTrend: 'up',
salesYear: '2020'
}

后退

$match

来年

$out