Docs 菜单

执行增量 Map-Reduce

注意

聚合管道作为替代方案

从MongoDB 5.0开始, map-reduce已弃用:

  • 您应该使用聚合管道,而不是 map-reduce。聚合管道提供比 map-reduce 更好的性能和可用性。

  • 您可以使用聚合管道阶段(例如 $group$merge 等)重写 map-reduce 操作。

  • 对于需要自定义功能的 map-reduce 操作,可以使用 $accumulator$function 聚合操作符。可以使用这些操作符在 JavaScript 中定义自定义聚合表达式。

有关 map-reduce 的聚合管道替代方案的示例,请参阅:

本部分提供了一个不使用自定义函数的 map-reduce 聚合管道替代方案示例。 有关使用自定义函数的示例,请参阅Map-Reduce 到聚合管道。

要执行 map-reduce 操作,MongoDB 提供了mapReduce命令,并在 mongosh中提供了db.collection.mapReduce()包装器方法。

如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce,而不是每次都对整个数据集执行 map-reduce 操作。

要执行增量 map-reduce:

  1. 对当前collection运行 map-reduce 作业,并将结果输出到单独的collection。

  2. 当有更多数据要处理时,请使用以下命令运行后续 map-reduce 作业:

    • query参数,用于指定匹配新文档的条件。

    • out参数,用于指定reduce操作以将新结果合并到现有输出collection中。

考虑以下示例,您安排对usersessions collection 执行 map-reduce 操作,使其在每天结束时运行。

usersessions集合包含每天记录用户会话的文档,例如:

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

运行第一个 map-reduce 操作,如下所示:

  1. 定义映射函数,将userid映射到包含字段total_timecountavg_time的对象:

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. 使用两个参数keyvalues定义相应的 reduce 函数,以计算总时间和计数。 key对应于useridvalues 是一个数组,其元素对应于映射到userid mapFunction的各个对象。

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. 使用两个参数keyreducedValue定义 finalize 函数。 该函数修改reducedValue文档以添加另一个字段average并返回修改后的文档。

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. 使用mapFunctionreduceFunctionfinalizeFunction函数对usersessions collection执行 map-reduce。将结果输出到collectionsession_stats中。如果session_stats collection 已存在,则该操作将替换以下内容:

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. 查询 session_stats 集合以验证结果:

    db.session_stats.find().sort( { _id: 1 } )

    该操作将返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

稍后,随着usersessions集合的增长,您可以运行其他 map-reduce 操作。 例如,向usersessions collection 添加新文档:

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

最终,对usersessions collection 执行增量 map-reduce,但使用query 字段仅选择新文档。将结果输出到collectionsession_stats中,但会使用增量 map-reduce 的结果来reduce内容:

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

查询 session_stats 集合以验证结果:

db.session_stats.find().sort( { _id: 1 } )

该操作将返回以下文档:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

作为 map-reduce 的替代方案,您可以使用结合了 和 $group$merge阶段的 聚合管道 ,以更灵活的操作实现相同的结果。

重新创建usersessionscollection:

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

使用可用的聚合管道操作符,您可以重写 map-reduce 示例,而无需定义自定义函数:

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. $groupuserid进行分组并计算:

    • 使用$sum操作符的total_time

    • 使用$sum操作符的count

    • 使用$avg操作符的avg_time

    该操作将返回以下文档:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. $project阶段重塑输出文档以镜像 map-reduce 的输出,使其具有两个字段_idvalue 。 如果不需要镜像_idvalue结构,则此阶段是可选的。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. $merge阶段将结果输出到session_stats_agg集合。 如果现有文档的_id与新结果相同,则该操作将应用指定的管道,根据结果和现有文档计算 total_time、count 和 avg_time。 如果session_stats_agg中不存在具有相同_id的现有文档,则该操作将插入该文档。

  4. 查询 session_stats_agg 集合以验证结果:

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作将返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. 将新文档添加到usersessionscollection:

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. 在管道的开头添加$match阶段以指定日期筛选器:

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. 查询 session_stats_agg 集合以验证结果:

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作将返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. 可选。 为了避免在每次运行时都修改聚合管道的$match日期条件,您可以定义在辅助函数中包装聚合:

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    然后,在运行时,您只需将开始日期传递给updateSessionStats()函数即可:

    updateSessionStats(ISODate('2020-03-05 00:00:00'))