Docs Menu

インクリメンタル map-reduce の実行

注意

Aggregation Pipeline as Alternative

MongoDB 5.0 以降、map-reduce は非推奨になっています。

  • map-reduceの代わりに、集計パイプラインを使用する必要があります。 集計パイプラインは、map-reduce よりもパフォーマンスとユーザビリティが優れています。

  • map-reduce 操作は、$group$merge などの集計パイプライン ステージ を使用して書き換えることができます。

  • カスタム機能を必要とする map-reduce 操作には、 $accumulator$functionの集計演算子を使用できます。 これらの演算子を使用して、JavaScript でカスタム集計式を定義できます。

map-reduce を集計パイプラインに置き換える例については、以下を参照してください。

This section has an example aggregation pipeline alternative to map-reduce that does not use a custom function. For an example that uses a custom function, see map-reduce から集計パイプラインへの移行.

To perform map-reduce operations, MongoDB provides the mapReduce command and, in mongosh, the db.collection.mapReduce() wrapper method.

If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.

To perform incremental map-reduce:

  1. Run a map-reduce job over the current collection and output the result to a separate collection.

  2. When you have more data to process, run subsequent map-reduce jobs with:

    • the query parameter that specifies conditions that match only the new documents.

    • the out parameter that specifies the reduce action to merge the new results into the existing output collection.

Consider the following example where you schedule a map-reduce operation on a usersessions collection to run at the end of each day.

The usersessions collection contains documents that log users' sessions each day, for example:

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Run the first map-reduce operation as follows:

  1. Define the map function that maps the userid to an object that contains the fields total_time, count, and avg_time:

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. Define the corresponding reduce function with two arguments key and values to calculate the total time and the count. The key corresponds to the userid, and the values is an array whose elements corresponds to the individual objects mapped to the userid in the mapFunction.

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. Define the finalize function with two arguments key and reducedValue. The function modifies the reducedValue document to add another field average and returns the modified document.

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. Perform map-reduce on the usersessions collection using the mapFunction, the reduceFunction, and the finalizeFunction functions. Output the results to a collection session_stats. If the session_stats collection already exists, the operation will replace the contents:

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. session_stats コレクションをクエリして、結果を検証します。

    db.session_stats.find().sort( { _id: 1 } )

    この操作により、次のドキュメントが返されます。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

Later, as the usersessions collection grows, you can run additional map-reduce operations. For example, add new documents to the usersessions collection:

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

At the end of the day, perform incremental map-reduce on the usersessions collection, but use the query field to select only the new documents. Output the results to the collection session_stats, but reduce the contents with the results of the incremental map-reduce:

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

session_stats コレクションをクエリして、結果を検証します。

db.session_stats.find().sort( { _id: 1 } )

この操作により、次のドキュメントが返されます。

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

As an alternative to map-reduce, you can use an aggregation pipeline that combines $group and $merge stages to achieve the same result in a more flexible operation.

Recreate the usersessions collection:

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Using the available aggregation pipeline operators, you can rewrite the map-reduce example without defining custom functions:

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. The $group groups by the userid and calculates:

    • The total_time using the $sum operator

    • The count using the $sum operator

    • The avg_time using the $avg operator

    この操作により、次のドキュメントが返されます。

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. The $project stage reshapes the output document to mirror the map-reduce's output to have two fields _id and value. The stage is optional if you do not need to mirror the _id and value structure.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. The $merge stage outputs the results to a session_stats_agg collection. If an existing document has the same _id as the new result, the operation applies the specified pipeline to calculate the total_time, count, and avg_time from the result and the existing document. If there is no existing document with the same _id in the session_stats_agg, the operation inserts the document.

  4. session_stats_agg コレクションをクエリして、結果を検証します。

    db.session_stats_agg.find().sort( { _id: 1 } )

    この操作により、次のドキュメントが返されます。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. Add new documents to the usersessions collection:

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. Add a $match stage at the start of the pipeline to specify the date filter:

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. session_stats_agg コレクションをクエリして、結果を検証します。

    db.session_stats_agg.find().sort( { _id: 1 } )

    この操作により、次のドキュメントが返されます。

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. Optional. To avoid having to modify the aggregation pipeline's $match date condition each time you run, you can define wrap the aggregation in a helper function:

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    Then, to run, you would just pass in the start date to the updateSessionStats() function:

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

以下も参照してください。