执行增量 Map-Reduce
注意
聚合管道作为替代方案
从MongoDB 5.0开始, map-reduce已弃用:
您应该使用聚合管道,而不是 map-reduce。聚合管道提供比 map-reduce 更好的性能和可用性。
对于需要自定义功能的 map-reduce 操作,可以使用
$accumulator
和$function
聚合操作符。可以使用这些操作符在 JavaScript 中定义自定义聚合表达式。
有关 map-reduce 的聚合管道替代方案的示例,请参阅:
本部分提供了一个不使用自定义函数的 map-reduce 聚合管道替代方案示例。 有关使用自定义函数的示例,请参阅Map-Reduce 到聚合管道。
要执行 map-reduce 操作,MongoDB 提供了mapReduce
命令,并在 mongosh
中提供了db.collection.mapReduce()
包装器方法。
如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce,而不是每次都对整个数据集执行 map-reduce 操作。
要执行增量 map-reduce:
对当前collection运行 map-reduce 作业,并将结果输出到单独的collection。
当有更多数据要处理时,请使用以下命令运行后续 map-reduce 作业:
query
参数,用于指定仅匹配新文档的条件。out
参数,用于指定reduce
操作以将新结果合并到现有输出collection中。
考虑以下示例,您安排对usersessions
collection 执行 map-reduce 操作,使其在每天结束时运行。
数据设置
usersessions
集合包含每天记录用户会话的文档,例如:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
当前collection的初始 map-reduce
运行第一个 map-reduce 操作,如下所示:
定义映射函数,将
userid
映射到包含字段total_time
、count
和avg_time
的对象:var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; 使用两个参数
key
和values
定义相应的 reduce 函数,以计算总时间和计数。key
对应于userid
,values
是一个数组,其元素对应于映射到userid
mapFunction
的各个对象。var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; 使用两个参数
key
和reducedValue
定义 finalize 函数。 该函数修改reducedValue
文档以添加另一个字段average
并返回修改后的文档。var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; 使用
mapFunction
、reduceFunction
和finalizeFunction
函数对usersessions
collection执行 map-reduce。将结果输出到collectionsession_stats
中。如果session_stats
collection 已存在,则该操作将替换以下内容:db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) 查询
session_stats
集合以验证结果:db.session_stats.find().sort( { _id: 1 } ) 该操作将返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
后续增量 Map-Reduce
稍后,随着usersessions
集合的增长,您可以运行其他 map-reduce 操作。 例如,向usersessions
collection 添加新文档:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
最终,对usersessions
collection 执行增量 map-reduce,但使用query
字段仅选择新文档。将结果输出到collectionsession_stats
中,但会使用增量 map-reduce 的结果来reduce
内容:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
查询 session_stats
集合以验证结果:
db.session_stats.find().sort( { _id: 1 } )
该操作将返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
聚合替代方案
作为 map-reduce 的替代方案,您可以使用结合了 和 $group
$merge
阶段的 聚合管道 ,以更灵活的操作实现相同的结果。
重新创建usersessions
collection:
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
使用可用的聚合管道操作符,您可以重写 map-reduce 示例,而无需定义自定义函数:
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
$group
按userid
进行分组并计算:该操作将返回以下文档:
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } $project
阶段重塑输出文档以镜像 map-reduce 的输出,使其具有两个字段_id
和value
。 如果不需要镜像_id
和value
结构,则此阶段是可选的。{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } $merge
阶段将结果输出到session_stats_agg
集合。 如果现有文档的_id
与新结果相同,则该操作将应用指定的管道,根据结果和现有文档计算 total_time、count 和 avg_time。 如果session_stats_agg
中不存在具有相同_id
的现有文档,则该操作将插入该文档。查询
session_stats_agg
集合以验证结果:db.session_stats_agg.find().sort( { _id: 1 } ) 该操作将返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } 将新文档添加到
usersessions
collection:db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) 在管道的开头添加
$match
阶段以指定日期筛选器:db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) 查询
session_stats_agg
集合以验证结果:db.session_stats_agg.find().sort( { _id: 1 } ) 该操作将返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } 可选。 为了避免在每次运行时都修改聚合管道的
$match
日期条件,您可以定义在辅助函数中包装聚合:updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; 然后,在运行时,您只需将开始日期传递给
updateSessionStats()
函数即可:updateSessionStats(ISODate('2020-03-05 00:00:00'))