Docs Menu
Docs Home
/
MongoDBマニュアル
/ /

インクリメンタル map-reduce の実行

項目一覧

  • データセットアップ
  • 現在のコレクションの初期 map-reduce
  • その後のインクリメンタル map-reduce
  • 集計の代替手段

注意

代替手段としての集計パイプライン

MongoDB 5.0以降、 map-reduceは非推奨です。

map-reduce を集計パイプラインに置き換える例については、以下を参照してください。

このセクションでは、カスタム関数を使用しない map-reduce に代わる集計パイプラインの例を示します。 カスタム関数を使用する例については、「 map-reduce から集計パイプラインへの移行 」を参照してください。

map-reduce 操作を実行するために、MongoDB はmapReduceコマンドと、 mongoshではdb.collection.mapReduce()ラッパー メソッドを提供します。

map-reduce データセットが常に増加する場合は、毎回データセット全体に対して map-reduce 操作を実行するのではなく、増分 map-reduce を実行することをお勧めします。

インクリメンタル map-reduce を実行するには

  1. 現在のコレクションに対して map-reduce ジョブを実行し、その結果を別のコレクションに出力します。

  2. 処理するデータが増えたら、以下を使用して後続の map-reduce ジョブを実行します。

    • 新しいドキュメントのみに一致する条件を指定するqueryパラメーター。

    • 新しい結果を既存の出力コレクションにマージするためのreduceアクションを指定するoutパラメーター。

usersessionsコレクションに対して map-reduce 操作を毎日の終わりに実行するようにスケジュールする次の例を考えてみましょう。

usersessionsコレクションには、ユーザーのセッションをログに記録するドキュメントが含まれています。以下はその例です。

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

最初の map-reduce 操作は次のように実行します。

  1. フィールドtotal_timecountavg_timeのフィールドを含むオブジェクトにuseridをマッピングする map 関数を定義します。

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. 合計時間とカウントを計算するには、対応する reduce 関数を 2 つの引数keyvaluesで定義します。 keyuseridに対応し、 valuesmapFunction内のuseridにマップされた個々のオブジェクトに対応する要素である配列です。

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. 2 つの引数keyreducedValueを使用して finalize 関数を定義します。 この関数は、 reducedValueドキュメントを変更して別のフィールドaverageを追加し、変更されたドキュメントを返します。

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. mapFunctionreduceFunctionfinalizeFunction関数を使用して、 usersessionsコレクションに対して map-reduce を実行します。 結果をコレクションsession_statsに出力します。 session_statsコレクションがすでに存在する場合、この操作によって内容が置き換えられます。

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. session_stats コレクションをクエリして、結果を検証します。

    db.session_stats.find().sort( { _id: 1 } )

    この操作により、次のドキュメントが返されます。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

後でusersessionsコレクションが大きくなるにつれて、追加の map-reduce 操作を実行できます。 For example, add new documents to the usersessions collection:

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

その日の終わりに、 usersessionsコレクションに対してインクリメンタル map-reduce を実行しますが、 queryフィールドを使用して新しいドキュメントのみを選択します。 結果をコレクションsession_statsに出力しますが、増分 map-reduce の結果を含む内容はreduceに出力されます。

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

session_stats コレクションをクエリして、結果を検証します。

db.session_stats.find().sort( { _id: 1 } )

この操作により、次のドキュメントが返されます。

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

map-reduce の代わりに、 } $groupステージと ステージを組み合わせた$merge 集計パイプライン を使用すると、より柔軟な操作で同じ結果が得られます。

usersessionsコレクションを再作成します。

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

次のとおり利用可能な集約パイプライン演算子を使用すると、カスタム関数を定義しなくても map-reduce の例を書き換えることができます。

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. $groupuseridでグループ化し、次の計算を行います。

    • $sum演算子を使用するtotal_time

    • $sum演算子を使用するcount

    • $avg演算子を使用するavg_time

    この操作により、次のドキュメントが返されます。

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. $projectステージでは、出力ドキュメントを再形成して、map-reduce の出力をミラーリングし、2 つのフィールド_idvalueを含めます。 _idvalue構造をミラーリングする必要がない場合は、 ステージは任意です。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. $mergeステージは結果をsession_stats_aggコレクションに出力します。 既存のドキュメントに新しい結果と同じ_id が含まれている場合、操作は指定されたパイプラインを適用して、結果と既存のドキュメントから total_time、count、avg_time を計算します。 session_stats_aggに同じ_idを持つ既存のドキュメントが存在しない場合は、この操作によってドキュメントが挿入されます。

  4. session_stats_agg コレクションをクエリして、結果を検証します。

    db.session_stats_agg.find().sort( { _id: 1 } )

    この操作により、次のドキュメントが返されます。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. usersessionsコレクションに新しいドキュメントを追加します。

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. 日付フィルターを指定するには、パイプラインの先頭に$matchステージを追加します。

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. session_stats_agg コレクションをクエリして、結果を検証します。

    db.session_stats_agg.find().sort( { _id: 1 } )

    この操作により、次のドキュメントが返されます。

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. 任意。 実行するたびに集計パイプラインの$match日付条件を変更しないで済むように、ヘルパー関数で集計をラップするように定義できます。

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    次に実行するには、開始日をupdateSessionStats()関数に渡すだけです。

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

Tip

以下も参照してください。

戻る