Docs Menu
Docs Home
/
MongoDBマニュアル
/ /

map-reduce から集計パイプラインへの移行

集計パイプラインは、 map-reduce操作よりも優れたパフォーマンスと使いやすさを提供します。

map-reduce 操作は、 、 などの $group$merge集計パイプライン演算子 を使用して書き換えることができます。

カスタム機能を必要とする map-reduce 操作の場合、MongoDB は$accumulator$functionの集計演算子を提供します。 これらの演算子を使用して、JavaScript でカスタム集計式を定義します。

Map-Reduce 式は、次のセクションに示すように書き直すことができます。

この表はおおよその翻訳です。たとえばこの表は、 $project を使用した mapFunction のおおよその翻訳を示しています。

  • ただし、mapFunction ロジックには、配列の反復処理が含まれる場合など、追加のステージが必要になる場合があります。

    function() {
    this.items.forEach(function(item){ emit(item.sku, 1); });
    }

    その後、集計パイプラインには$unwind$projectが含まれます。

    { $unwind: "$items "},
    { $project: { emits: { key: { "$items.sku" }, value: 1 } } },
  • $projectemitsフィールドは別の名前にすることができます。視覚的な比較のために、フィールド名emitsが選択されています。

Map-Reduce
Aggregation Pipeline
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: <collection>
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $out: <collection> }
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { replace: <collection>, db:<db> }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $out: { db: <db>, coll: <collection> } }
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { merge: <collection>, db: <db> }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $merge: {
into: { db: <db>, coll: <collection>},
on: "_id"
whenMatched: "replace",
whenNotMatched: "insert"
} },
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { reduce: <collection>, db: <db> }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $merge: {
into: { db: <db>, coll: <collection> },
on: "_id"
whenMatched: [
{ $project: {
value: { $function: {
body: <reduceFunction>,
args: [
"$_id",
[ "$value", "$$new.value" ]
],
lang: "js"
} }
} }
]
whenNotMatched: "insert"
} },
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { inline: 1 }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} }
] )

カスタム関数なしで、 $group$mergeなどの集計パイプラインステージ を使用して、さまざまな map-reduce 操作を書き換えることができます。 ただし、より明確にするため、次の例では両方の代替手段を説明します。

次の map-reduce 操作は orders コレクションに対して行われ、cust_id によってグループ化され、各 cust_id ごとにprice の合計を計算します。

var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)

選択肢1:(推奨)map-reduce関数を同等のパイプラインステージに変換することなく、処理を集計パイプラインに書き換えることができます。

db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])

選択肢2:(参考のみ)下記の集計パイプラインは、 $accumulatorをカスタム関数の定義に使用し、さまざまな map-reduce 関数の翻訳を提供します。

db.orders.aggregate( [
{ $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function
{ $group: { // equivalent to the reduce function
_id: "$emit.key",
valuesPrices: { $accumulator: {
init: function() { return 0; },
initArgs: [],
accumulate: function(state, value) { return state + value; },
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) { return state1 + state2; },
lang: "js"
} }
} },
{ $out: "agg_alternative_2" }
] )
  1. まず、 $projectステージでemit フィールドのあるドキュメントが出力されます。emitフィールドは以下のフィールドのあるドキュメントです。

    • key ドキュメントのcust_id値を含む

    • value ドキュメントのprice値を含む

    { "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } }
    { "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } }
    { "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } }
    { "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } }
    { "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } }
    { "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } }
    { "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } }
    { "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } }
    { "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } }
    { "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } }
  2. 次に、 $group$accumulator演算子を使用して、出力された値を追加します。

    { "_id" : "Don Quis", "valuesPrices" : 155 }
    { "_id" : "Cam Elot", "valuesPrices" : 60 }
    { "_id" : "Ant O. Knee", "valuesPrices" : 95 }
    { "_id" : "Busby Bee", "valuesPrices" : 125 }
  3. 最後に、 $outは出力をコレクションagg_alternative_2に書き込みます。あるいは、 $out の代わりに$mergeを使用することもできます

ordersコレクションに対する次の map-reduce 操作では、 item.skuフィールドごとにグループ化して、各 SKU の注文数と注文総数を計算します。次に、オペレーションは各 SKU 値の注文あたりの平均数量を計算し、結果を出力コレクションにマージします。

var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = { count: 1, qty: this.items[idx].qty };
emit(key, value);
}
};
var reduceFunction2 = function(keySKU, countObjVals) {
reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example2" },
query: { ord_date: { $gte: new Date("2020-03-01") } },
finalize: finalizeFunction2
}
);

選択肢1:(推奨)map-reduce関数を同等のパイプラインステージに変換することなく、処理を集計パイプラインに書き換えることができます。

db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

選択肢2:(参考のみ)下記の集計パイプラインは、 $accumulatorをカスタム関数の定義に使用し、さまざまな map-reduce 関数の翻訳を提供します。

db.orders.aggregate( [
{ $match: { ord_date: {$gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },
{ $group: {
_id: "$emit.key",
value: { $accumulator: {
init: function() { return { count: 0, qty: 0 }; },
initArgs: [],
accumulate: function(state, value) {
state.count += value.count;
state.qty += value.qty;
return state;
},
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) {
return { count: state1.count + state2.count, qty: state1.qty + state2.qty };
},
finalize: function(state) {
state.avg = state.qty / state.count;
return state;
},
lang: "js"}
}
} },
{ $merge: {
into: "agg_alternative_4",
on: "_id",
whenMatched: "replace",
whenNotMatched: "insert"
} }
] )
  1. $matchステージでは、 ord_datenew Date("2020-03-01")以上であるドキュメントのみが選択されます。

  2. $unwind ステージでは、items 配列フィールドでドキュメントを分割して、配列要素ごとにドキュメントを出力します。たとえば:

    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    ...
  3. $projectステージは、 emitフィールドを持つドキュメントを出力します。emitフィールドは以下のフィールドのあるドキュメントです。

    • key items.sku値を含む

    • value qtyの値とcount の値を持つドキュメントを含む。

    { "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } }
    { "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } }
    { "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } }
    { "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
    { "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
    { "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } }
    { "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
    { "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
    ...
  4. $group$accumulator演算子を使用して、出力されたcountqtyを加算し、 avgフィールドを計算します。

    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
  5. 最後に、 $mergeは出力をコレクションagg_alternative_4に書き込みます。既存のドキュメントに新しい結果と同じキー_idがある場合、この操作によって既存のドキュメントが上書きされます。 同じキーを持つ既存のドキュメントが存在しない場合は、この操作によってドキュメントが挿入されます。

Tip

以下も参照してください。

戻る

削減のトラブルシューティング