インクリメンタル map-reduce の実行
注意
Aggregation Pipeline as Alternative
MongoDB 5.0 以降、map-reduce は非推奨になっています。
map-reduceの代わりに、集計パイプラインを使用する必要があります。 集計パイプラインは、map-reduce よりもパフォーマンスとユーザビリティが優れています。
map-reduce 操作は、
$group
、$merge
などの集計パイプライン ステージを使用して書き換えることができます。
カスタム機能を必要とする map-reduce 操作には、
$accumulator
と$function
の集計演算子を使用できます。 これらの演算子を使用して、JavaScript でカスタム集計式を定義できます。
map-reduce を集計パイプラインに置き換える例については、以下を参照してください。
This section has an example aggregation pipeline alternative to map-reduce that does not use a custom function. For an example that uses a custom function, see map-reduce から集計パイプラインへの移行.
To perform map-reduce operations, MongoDB provides the
mapReduce
command and, in mongosh
,
the db.collection.mapReduce()
wrapper method.
If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.
To perform incremental map-reduce:
Run a map-reduce job over the current collection and output the result to a separate collection.
When you have more data to process, run subsequent map-reduce jobs with:
the
query
parameter that specifies conditions that match only the new documents.the
out
parameter that specifies thereduce
action to merge the new results into the existing output collection.
Consider the following example where you schedule a map-reduce
operation on a usersessions
collection to run at the end of each day.
Data Setup
The usersessions
collection contains documents that log users' sessions
each day, for example:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Initial Map-Reduce of Current Collection
Run the first map-reduce operation as follows:
Define the map function that maps the
userid
to an object that contains the fieldstotal_time
,count
, andavg_time
:var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; Define the corresponding reduce function with two arguments
key
andvalues
to calculate the total time and the count. Thekey
corresponds to theuserid
, and thevalues
is an array whose elements corresponds to the individual objects mapped to theuserid
in themapFunction
.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; Define the finalize function with two arguments
key
andreducedValue
. The function modifies thereducedValue
document to add another fieldaverage
and returns the modified document.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; Perform map-reduce on the
usersessions
collection using themapFunction
, thereduceFunction
, and thefinalizeFunction
functions. Output the results to a collectionsession_stats
. If thesession_stats
collection already exists, the operation will replace the contents:db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) session_stats
コレクションをクエリして、結果を検証します。db.session_stats.find().sort( { _id: 1 } ) この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
Subsequent Incremental Map-Reduce
Later, as the usersessions
collection grows, you can run additional
map-reduce operations. For example, add new documents to the
usersessions
collection:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
At the end of the day, perform incremental map-reduce on the
usersessions
collection, but use the query
field to select only the
new documents. Output the results to the collection session_stats
,
but reduce
the contents with the results of the incremental
map-reduce:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
session_stats
コレクションをクエリして、結果を検証します。
db.session_stats.find().sort( { _id: 1 } )
この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
集計の代替手段
As an alternative to map-reduce, you can use an aggregation
pipeline that combines $group
and $merge
stages to achieve the same result in a more
flexible operation.
Recreate the usersessions
collection:
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Using the available aggregation pipeline operators, you can rewrite the map-reduce example without defining custom functions:
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
The
$group
groups by theuserid
and calculates:The
total_time
using the$sum
operatorThe
count
using the$sum
operatorThe
avg_time
using the$avg
operator
この操作により、次のドキュメントが返されます。
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } The
$project
stage reshapes the output document to mirror the map-reduce's output to have two fields_id
andvalue
. The stage is optional if you do not need to mirror the_id
andvalue
structure.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } The
$merge
stage outputs the results to asession_stats_agg
collection. If an existing document has the same_id
as the new result, the operation applies the specified pipeline to calculate the total_time, count, and avg_time from the result and the existing document. If there is no existing document with the same_id
in thesession_stats_agg
, the operation inserts the document.session_stats_agg
コレクションをクエリして、結果を検証します。db.session_stats_agg.find().sort( { _id: 1 } ) この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } Add new documents to the
usersessions
collection:db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) Add a
$match
stage at the start of the pipeline to specify the date filter:db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) session_stats_agg
コレクションをクエリして、結果を検証します。db.session_stats_agg.find().sort( { _id: 1 } ) この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } Optional. To avoid having to modify the aggregation pipeline's
$match
date condition each time you run, you can define wrap the aggregation in a helper function:updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; Then, to run, you would just pass in the start date to the
updateSessionStats()
function:updateSessionStats(ISODate('2020-03-05 00:00:00'))
以下も参照してください。