インクリメンタル map-reduce の実行
注意
代替手段としての集計パイプライン
MongoDB 5.0以降、 map-reduceは非推奨です。
map-reduceの代わりに、集計パイプラインを使用する必要があります。 集計パイプラインは、map-reduce よりもパフォーマンスとユーザビリティが優れています。
$group
$merge
などの集約パイプライン ステージ を使用して、map-reduce操作を書き換えることができます。カスタム機能を必要とする map-reduce 操作には、
$accumulator
と$function
の集計演算子を使用できます。 これらの演算子を使用して、JavaScript でカスタム集計式を定義できます。
map-reduce を集計パイプラインに置き換える例については、以下を参照してください。
このセクションでは、カスタム関数を使用しない map-reduce に代わる集計パイプラインの例を示します。 カスタム関数を使用する例については、「 map-reduce から集計パイプラインへの移行 」を参照してください。
map-reduce 操作を実行するために、MongoDB はmapReduce
コマンドと、 mongosh
ではdb.collection.mapReduce()
ラッパー メソッドを提供します。
map-reduce データセットが常に増加する場合は、毎回データセット全体に対して map-reduce 操作を実行するのではなく、増分 map-reduce を実行することをお勧めします。
インクリメンタル map-reduce を実行するには
現在のコレクションに対して map-reduce ジョブを実行し、その結果を別のコレクションに出力します。
処理するデータが増えたら、以下を使用して後続の map-reduce ジョブを実行します。
新しいドキュメントのみに一致する条件を指定する
query
パラメーター。新しい結果を既存の出力コレクションにマージするための
reduce
アクションを指定するout
パラメーター。
usersessions
コレクションに対して map-reduce 操作を毎日の終わりに実行するようにスケジュールする次の例を考えてみましょう。
データセットアップ
usersessions
コレクションには、ユーザーのセッションをログに記録するドキュメントが含まれています。以下はその例です。
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
現在のコレクションの初期 map-reduce
最初の map-reduce 操作は次のように実行します。
フィールド
total_time
、count
、avg_time
のフィールドを含むオブジェクトにuserid
をマッピングする map 関数を定義します。var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; 合計時間とカウントを計算するには、対応する reduce 関数を 2 つの引数
key
とvalues
で定義します。key
はuserid
に対応し、values
はmapFunction
内のuserid
にマップされた個々のオブジェクトに対応する要素である配列です。var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; 2 つの引数
key
とreducedValue
を使用して finalize 関数を定義します。 この関数は、reducedValue
ドキュメントを変更して別のフィールドaverage
を追加し、変更されたドキュメントを返します。var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; mapFunction
、reduceFunction
、finalizeFunction
関数を使用して、usersessions
コレクションに対して map-reduce を実行します。 結果をコレクションsession_stats
に出力します。session_stats
コレクションがすでに存在する場合、この操作によって内容が置き換えられます。db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) session_stats
コレクションをクエリして、結果を検証します。db.session_stats.find().sort( { _id: 1 } ) この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
その後のインクリメンタル map-reduce
後でusersessions
コレクションが大きくなるにつれて、追加の map-reduce 操作を実行できます。 For example, add new documents to the usersessions
collection:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
その日の終わりに、 usersessions
コレクションに対してインクリメンタル map-reduce を実行しますが、 query
フィールドを使用して新しいドキュメントのみを選択します。 結果をコレクションsession_stats
に出力しますが、増分 map-reduce の結果を含む内容はreduce
に出力されます。
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
session_stats
コレクションをクエリして、結果を検証します。
db.session_stats.find().sort( { _id: 1 } )
この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
集計の代替手段
map-reduce の代わりに、 } $group
ステージと ステージを組み合わせた$merge
集計パイプライン を使用すると、より柔軟な操作で同じ結果が得られます。
usersessions
コレクションを再作成します。
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
次のとおり利用可能な集約パイプライン演算子を使用すると、カスタム関数を定義しなくても map-reduce の例を書き換えることができます。
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
$group
はuserid
でグループ化し、次の計算を行います。この操作により、次のドキュメントが返されます。
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } $project
ステージでは、出力ドキュメントを再形成して、map-reduce の出力をミラーリングし、2 つのフィールド_id
とvalue
を含めます。_id
とvalue
構造をミラーリングする必要がない場合は、 ステージは任意です。{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } $merge
ステージは結果をsession_stats_agg
コレクションに出力します。 既存のドキュメントに新しい結果と同じ_id
が含まれている場合、操作は指定されたパイプラインを適用して、結果と既存のドキュメントから total_time、count、avg_time を計算します。session_stats_agg
に同じ_id
を持つ既存のドキュメントが存在しない場合は、この操作によってドキュメントが挿入されます。session_stats_agg
コレクションをクエリして、結果を検証します。db.session_stats_agg.find().sort( { _id: 1 } ) この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } usersessions
コレクションに新しいドキュメントを追加します。db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) 日付フィルターを指定するには、パイプラインの先頭に
$match
ステージを追加します。db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) session_stats_agg
コレクションをクエリして、結果を検証します。db.session_stats_agg.find().sort( { _id: 1 } ) この操作により、次のドキュメントが返されます。
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } 任意。 実行するたびに集計パイプラインの
$match
日付条件を変更しないで済むように、ヘルパー関数で集計をラップするように定義できます。updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; 次に実行するには、開始日を
updateSessionStats()
関数に渡すだけです。updateSessionStats(ISODate('2020-03-05 00:00:00'))