Docs 菜单
Docs 主页
/
MongoDB Manual
/ / /

$accumulator(聚合)

在此页面上

  • 定义
  • 语法
  • 行为
  • 举例
$accumulator

定义自定义累加器操作符。累加器是在文档通过管道时保持其状态(例如总计、最大值、最小值和相关数据)的操作符。使用 $accumulator操作符执行您自己的 JavaScript 函数,以实现 MongoDB 查询语言不支持的行为。另请参阅$function

$accumulator 可在以下阶段使用:

重要

在聚合操作符内执行 JavaScript 可能会降低性能。仅当提供的$accumulator 管道操作符 无法满足应用程序的需求时,才使用 操作符。

$accumulator操作符的语法如下:

{
$accumulator: {
init: <code>,
initArgs: <array expression>, // Optional
accumulate: <code>,
accumulateArgs: <array expression>,
merge: <code>,
finalize: <code>, // Optional
lang: <string>
}
}
字段
类型
说明
初始化
字符串或代码

用于初始化状态的函数。 init 函数从 initArgs 数组表达式中接收参数。您可以将函数定义指定为 BSON 类型 Code 或 String。

init 函数具有以下形式:

function (<initArg1>, <initArg2>, ...) {
...
return <initialState>
}

注意

溢出到磁盘或在分片集群上运行查询可导致累加器被计算为多个子累加的合并,其中的每个子累加都以调用 init() 开始。确保您的 init()accumulate()merge() 函数与此执行模型兼容。

阵列

可选。传递给 init 函数的参数。

initArgs 采用以下形式:

[ <initArg1>, <initArg2>, ... ]

重要

$bucketAuto 阶段中使用时,initArgs 不能指向组密钥(即不能使用 $<fieldName> 语法)。相反,在 $bucketAuto 阶段,只能在 initArgs 中指定常量值。

字符串或代码

用于累积文档的函数。 accumulate函数从当前状态和accumulateArgs数组表达式接收参数。 accumulate函数的结果将成为新状态。您可以将函数定义指定为 BSON 类型 Code 或 String。

accumulate 函数具有以下形式:

function(state, <accumArg1>, <accumArg2>, ...) {
...
return <newState>
}
阵列

传递给 accumulate 函数的参数。可以使用 accumulateArgs 指定要传递给 accumulate 函数的字段值。

accumulateArgs 采用以下形式:

[ <accumArg1>, <accumArg2>, ... ]
字符串或代码

用于合并两个内部状态的函数。 merge必须是字符串或代码 BSON 类型。 merge返回两个合并状态的组合结果。有关何时调用 merge 函数的信息,请参阅使用$merge合并两个状态。

merge 函数具有以下形式:

function (<state1>, <state2>) {
<logic to merge state1 and state2>
return <newState>
}
字符串或代码

可选。用于更新累积结果的函数。

finalize 函数具有以下形式:

function (state) {
...
return <finalState>
}
字符串

$accumulator代码中使用的语言。

重要

目前,lang 唯一受支持的值为 js

以下步骤概述了$accumulator操作符如何处理文档:

  1. 操作符从 init 函数定义的初始状态开始。

  2. 对于每份文档,操作符根据累积函数更新状态。 accumulate函数的第一个参数是当前状态,其他参数在accumulateArgs数组中指定。

  3. 当操作符需要合并多个中间状态时,它会执行merge函数。有关何时调用merge函数的更多信息,请参阅使用$merge合并两个状态。

  4. 如果定义了 finalize 函数,一旦处理完所有文档并相应更新了状态,finalize 就会将状态转换为最终输出。

作为其内部操作的一部分, $accumulator操作符可能需要合并两个独立的中间状态。 merge函数指定操作符应如何合并两个状态。

merge函数总是一次合并两个状态。如果必须合并两个以上的状态,则两个状态的合并结果将与单个状态合并。重复此过程,直到合并所有状态。

例如, $accumulator在以下情况下可能需要合并两个状态:

  • $accumulator 在分片集群上运行。操作符需要合并每个分片的结果,以获得最终结果。

  • 单个$accumulator操作超过其指定的内存限制。如果指定allowDiskUse选项,该操作符会将进行中的操作存储在磁盘上,并在内存中完成操作。操作完成后,将使用merge函数将磁盘和内存中的结果合并在一起。

MongoDB 处理init()accumulate()merge()函数的文档的顺序可能不同,并且可能与为$accumulator函数指定这些文档的顺序不同。

例如,考虑一系列文档,其中_id字段是字母表中的字母:

{ _id: 'a' },
{ _id: 'b' },
{ _id: 'c' }
...
{ _id: 'z' }

接下来,考虑一个聚合管道,它按_id字段对文档进行排序,然后使用$accumulator函数连接_id字段值:

[
{
$sort: { _id: 1 }
},
{
$group: {
_id: null,
alphabet: {
$accumulator: {
init: function() {
return ""
},
accumulate: function(state, letter) {
return(state + letter)
},
accumulateArgs: [ "$_id" ],
merge: function(state1, state2) {
return(state1 + state2)
},
lang: "js"
}
}
}
}
]

MongoDB 不保证按排序顺序处理文档,这意味着alphabet字段不一定会设置为abc...z

由于这种行为,请确保您的$accumulator函数不需要按特定顺序处理和返回文档。

要使用$accumulator ,必须启用服务器端脚本。

如果不使用$accumulator (或$function$wheremapReduce ),请禁用服务器端脚本:

另请参阅 ➤使用安全配置选项运行 MongoDB

MongoDB6 。0 升级用于 服务器端 JavaScript 、 、$accumulator $function和 表达式的内部$where JavaScript 引擎,并从 MozJS-60 升级到 MozJS-91 。 MozJS- 中存在的几个已弃用的非标准数组和字符串函数已在60 MozJS-91 中删除。

有关已删除数组和字符串函数的完整列表,请参阅 6.0 兼容性说明

注意

此示例演示如何使用$accumulator操作符实施 MongoDB 已支持的$avg操作符。此示例的目标不是实现新功能,而是用熟悉的逻辑说明$accumulator操作符的行为和语法。

mongosh中,使用以下文档创建名为books的示例集合:

db.books.insertMany([
{ "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 },
{ "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 },
{ "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 },
{ "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 },
{ "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }
])

以下操作将文档按author groups并使用$accumulator计算每位作者跨书籍的平均副本数:

db.books.aggregate([
{
$group :
{
_id : "$author",
avgCopies:
{
$accumulator:
{
init: function() { // Set the initial state
return { count: 0, sum: 0 }
},
accumulate: function(state, numCopies) { // Define how to update the state
return {
count: state.count + 1,
sum: state.sum + numCopies
}
},
accumulateArgs: ["$copies"], // Argument required by the accumulate function
merge: function(state1, state2) { // When the operator performs a merge,
return { // add the fields from the two states
count: state1.count + state2.count,
sum: state1.sum + state2.sum
}
},
finalize: function(state) { // After collecting the results from all documents,
return (state.sum / state.count) // calculate the average
},
lang: "js"
}
}
}
}
])

此操作返回以下结果:

{ "_id" : "Dante", "avgCopies" : 1.6666666666666667 }
{ "_id" : "Homer", "avgCopies" : 10 }

$accumulator定义了一个初始状态,其中countsum均设置为0 。对于$accumulator处理的每个文档,它都会通过以下方式更新状态:

  • count 增加 1,然后

  • 将文档的copies字段的值添加到sum中。那里的accumulate函数可以访问copies字段,因为它是在accumulateArgs字段中传递的。

对于处理的每个文档,accumulate 函数都返回更新状态。

处理完所有文档后, finalize函数会将副本的sum除以文档的count ,以获得平均值。这样就无需保持运行计算平均值,因为finalize函数会接收所有文档的累积sumcount

此操作等同于使用 $avg 运算符的以下管道:

db.books.aggregate([
{
$group : {
_id : "$author",
avgCopies: { $avg: "$copies" }
}
}
])

您可以使用initArgs选项来改变$accumulator的初始状态。如果您愿意,这可能很有用,例如:

  • 使用不属于您所在状态的字段的值来影响您的状态,或者

  • 根据正在处理的群组将初始状态设置为不同值。

mongosh中,使用以下文档创建名为restaurants的示例集合:

db.restaurants.insertMany([
{ "_id" : 1, "name" : "Food Fury", "city" : "Bettles", "cuisine" : "American" },
{ "_id" : 2, "name" : "Meal Macro", "city" : "Bettles", "cuisine" : "Chinese" },
{ "_id" : 3, "name" : "Big Crisp", "city" : "Bettles", "cuisine" : "Latin" },
{ "_id" : 4, "name" : "The Wrap", "city" : "Onida", "cuisine" : "American" },
{ "_id" : 5, "name" : "Spice Attack", "city" : "Onida", "cuisine" : "Latin" },
{ "_id" : 6, "name" : "Soup City", "city" : "Onida", "cuisine" : "Chinese" },
{ "_id" : 7, "name" : "Crave", "city" : "Pyote", "cuisine" : "American" },
{ "_id" : 8, "name" : "The Gala", "city" : "Pyote", "cuisine" : "Chinese" }
])

假设某应用程序允许用户查询此数据以查找餐厅。它可能有助于显示该用户所在城市的更多结果。在此示例中,假定用户的城市在名为 userProfileCity 的变量中被调用。

以下聚合管道groupscity处理文档。该操作使用$accumulator显示每个城市的不同数量的结果,具体取决于餐厅所在城市是否与用户个人资料中的城市相匹配:

注意

要在mongosh中执行此示例,请将initArgs中的<userProfileCity>替换为包含实际城市值的字符串,例如Bettles

1db.restaurants.aggregate([
2{
3 $group :
4 {
5 _id : { city: "$city" },
6 restaurants:
7 {
8 $accumulator:
9 {
10 init: function(city, userProfileCity) { // Set the initial state
11 return {
12 max: city === userProfileCity ? 3 : 1, // If the group matches the user's city, return 3 restaurants
13 restaurants: [] // else, return 1 restaurant
14 }
15 },
16
17 initArgs: ["$city", <userProfileCity>], // Argument to pass to the init function
18
19 accumulate: function(state, restaurantName) { // Define how to update the state
20 if (state.restaurants.length < state.max) {
21 state.restaurants.push(restaurantName);
22 }
23 return state;
24 },
25
26 accumulateArgs: ["$name"], // Argument required by the accumulate function
27
28 merge: function(state1, state2) {
29 return {
30 max: state1.max,
31 restaurants: state1.restaurants.concat(state2.restaurants).slice(0, state1.max)
32 }
33 },
34
35 finalize: function(state) { // Adjust the state to only return field we need
36 return state.restaurants
37 }
38
39 lang: "js"
40 }
41 }
42 }
43}
44])

如果 userProfileCity 的值为 Bettles,此操作会返回以下结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury", "Meal Macro", "Big Crisp" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

如果 userProfileCity 的值为 Onida,此操作会返回以下结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap", "Spice Attack", "Soup City" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

如果 userProfileCity 的值为 Pyote,此操作会返回以下结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave", "The Gala" ] } }

如果 userProfileCity 的值是任何其他值,则该操作将返回以下结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

init函数定义包含maxrestaurants字段的初始状态。 max字段设置该特定群组的最大餐厅数量。如果文档的city字段与userProfileCity匹配,则该群组最多包含3家餐厅。否则,如果文档_iduserProfileCity不匹配,则该群组最多包含一家餐厅。 init函数接收initArgs数组中的两个city userProfileCity参数。

对于$accumulator处理的每个文档,它都会将餐厅的name推送到restaurants数组,前提是该名称不会使restaurants的长度超过max值。对于处理的每个文档, accumulate函数都会返回更新后的状态。

merge 函数定义如何合并两个状态。该函数将每个状态的restaurant 数组连接在一起,并使用 slice() 方法,以确保其不超过max 值。

处理完所有文档后, finalize函数会修改结果状态,仅返回餐厅名称。如果没有此函数, max字段也会包含在输出中,这无法满足应用程序的任何需求。

← $abs(聚合)