맵-리듀스에서 집계 파이프라인으로의 전환
집계 파이프라인은 맵 리듀스 작업보다 더 나은 성능과 사용성을 제공합니다.
맵 리듀스 작업은 $group
및 $merge
등 집계 파이프라인 단계를 사용해 다시 작성할 수 있습니다.
사용자 지정 기능이 필요한 맵 리듀스 작업의 경우 MongoDB는 $accumulator
및 $function
집계 연산자를 제공합니다. 이러한 연산자를 사용하여 JavaScript에서 사용자 지정 집계 표현식을 정의할 수 있습니다.
맵 리듀스 표현식은 다음 섹션에 표시된 대로 다시 작성할 수 있습니다.
맵 리듀스에서 집계 파이프라인으로의 변환 테이블
이 표는 대략적인 번역일 뿐입니다. 예를 들어, 표는 $project
을 사용하여 mapFunction
의 대략적인 번역을 보여줍니다.
그러나
mapFunction
로직에는 로직에 배열에 대한 반복이 포함된 경우와 같이 추가 단계가 필요할 수 있습니다.function() { this.items.forEach(function(item){ emit(item.sku, 1); }); } 그런 다음 집계 파이프라인에
$unwind
및$project
가 포함됩니다.{ $unwind: "$items "}, { $project: { emits: { key: { "$items.sku" }, value: 1 } } }, $project
의emits
필드는 다른 이름으로 지정할 수 있습니다. 시각적 비교를 위해 필드 이름emits
를 선택했습니다.
맵 축소 | 집계 파이프라인 |
---|---|
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: <collection> } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $out: <collection> } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { replace: <collection>, db:<db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $out: { db: <db>, coll: <collection> } } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $merge: { into: { db: <db>, coll: <collection>}, on: "_id" whenMatched: "replace", whenNotMatched: "insert" } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { reduce: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $merge: { into: { db: <db>, coll: <collection> }, on: "_id" whenMatched: [ { $project: { value: { $function: { body: <reduceFunction>, args: [ "$_id", [ "$value", "$$new.value" ] ], lang: "js" } } } } ] whenNotMatched: "insert" } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { inline: 1 } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } } ] ) |
예시
사용자 지정 함수 없이도 $group
, $merge
등과 같은 집계 파이프라인 연산자를 사용하여 다양한 맵 리듀스 표현식을 다시 작성할 수 있습니다. 그러나 설명의 편의를 위해 다음 예시에서는 두 가지 대안을 모두 제공합니다.
예시 1
orders
컬렉션에서의 다음 맵 리듀스 연산은 cust_id
별로 그룹화하고 각 cust_id
에 대한 price
의 합계를 계산합니다.
var mapFunction1 = function() { emit(this.cust_id, this.price); }; var reduceFunction1 = function(keyCustId, valuesPrices) { return Array.sum(valuesPrices); }; db.orders.mapReduce( mapFunction1, reduceFunction1, { out: "map_reduce_example" } )
대안 1: (권장) 맵 리듀스 함수를 동등한 파이프라인 단계로 변환하지 않고 작업을 집계 파이프라인으로 다시 작성할 수 있습니다:
db.orders.aggregate([ { $group: { _id: "$cust_id", value: { $sum: "$price" } } }, { $out: "agg_alternative_1" } ])
대안 2: (설명용으로만 사용) 다음 집계 파이프라인은 $accumulator
를 사용하여 사용자 지정 함수를 정의하는 다양한 맵 리듀스 함수의 변환을 제공합니다.
db.orders.aggregate( [ { $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function { $group: { // equivalent to the reduce function _id: "$emit.key", valuesPrices: { $accumulator: { init: function() { return 0; }, initArgs: [], accumulate: function(state, value) { return state + value; }, accumulateArgs: [ "$emit.value" ], merge: function(state1, state2) { return state1 + state2; }, lang: "js" } } } }, { $out: "agg_alternative_2" } ] )
먼저,
$project
단계는emit
필드가 있는 문서를 출력합니다.emit
필드는 다음 필드를 가진 문서입니다.key
문서에 대한cust_id
값이 포함되어 있습니다.value
문서에 대한price
값이 포함되어 있습니다.
{ "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } } { "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } } { "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } } { "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } } { "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } } { "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } } { "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } } { "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } } { "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } } { "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } } 그런 다음
$group
은$accumulator
연산자를 사용하여 방출된 값을 추가합니다.{ "_id" : "Don Quis", "valuesPrices" : 155 } { "_id" : "Cam Elot", "valuesPrices" : 60 } { "_id" : "Ant O. Knee", "valuesPrices" : 95 } { "_id" : "Busby Bee", "valuesPrices" : 125 } 마지막으로
$out
은 출력을 collectionagg_alternative_2
에 씁니다. 또는$out
대신$merge
를 사용할 수도 있습니다.
예시 2
orders
컬렉션에 대한 다음 맵 리듀스 작업은 item.sku
필드를 기준으로 그룹화하고 주문 수와 각 sku별 총 주문 수량을 계산합니다. 그런 다음 작업은 각 SKU 값별 주문당 평균 수량을 계산하고 결과를 컬렉션에 병합합니다.
var mapFunction2 = function() { for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].sku; var value = { count: 1, qty: this.items[idx].qty }; emit(key, value); } }; var reduceFunction2 = function(keySKU, countObjVals) { reducedVal = { count: 0, qty: 0 }; for (var idx = 0; idx < countObjVals.length; idx++) { reducedVal.count += countObjVals[idx].count; reducedVal.qty += countObjVals[idx].qty; } return reducedVal; }; var finalizeFunction2 = function (key, reducedVal) { reducedVal.avg = reducedVal.qty/reducedVal.count; return reducedVal; }; db.orders.mapReduce( mapFunction2, reduceFunction2, { out: { merge: "map_reduce_example2" }, query: { ord_date: { $gte: new Date("2020-03-01") } }, finalize: finalizeFunction2 } );
대안 1: (권장) 맵 리듀스 함수를 동등한 파이프라인 단계로 변환하지 않고 작업을 집계 파이프라인으로 다시 작성할 수 있습니다:
db.orders.aggregate( [ { $match: { ord_date: { $gte: new Date("2020-03-01") } } }, { $unwind: "$items" }, { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } }, { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } }, { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
대안 2: (설명용으로만 사용) 다음 집계 파이프라인은 $accumulator
를 사용하여 사용자 지정 함수를 정의하는 다양한 맵 리듀스 함수의 변환을 제공합니다.
db.orders.aggregate( [ { $match: { ord_date: {$gte: new Date("2020-03-01") } } }, { $unwind: "$items" }, { $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } }, { $group: { _id: "$emit.key", value: { $accumulator: { init: function() { return { count: 0, qty: 0 }; }, initArgs: [], accumulate: function(state, value) { state.count += value.count; state.qty += value.qty; return state; }, accumulateArgs: [ "$emit.value" ], merge: function(state1, state2) { return { count: state1.count + state2.count, qty: state1.qty + state2.qty }; }, finalize: function(state) { state.avg = state.qty / state.count; return state; }, lang: "js"} } } }, { $merge: { into: "agg_alternative_4", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
$match
단계에서는ord_date
가new Date("2020-03-01")
이상인 문서만 선택합니다.$unwind
단계에서는items
배열 필드별로 문서를 세분화하여 각 배열 요소에 대한 문서를 출력합니다. 예시:{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } ... $project
단계에서는emit
필드가 있는 문서를 출력합니다.emit
필드는 다음 필드를 가진 문서입니다.key
items.sku
값을 포함value
qty
값과count
값이 있는 문서 포함
{ "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } } { "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } } { "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } } { "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } } { "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } } { "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } } { "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } } { "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } } ... $group
은$accumulator
연산자를 사용하여 내보낸count
및qty
를 추가하고avg
필드를 계산합니다.{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } } { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } } { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } } { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } } { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } } 마지막으로
$merge
는 출력을 컬렉션agg_alternative_4
에 씁니다. 기존 문서에 새 결과와 동일한 키_id
가 있는 경우 작업은 기존 문서를 덮어씁니다. 동일한 키를 가진 기존 문서가 없는 경우 작업이 문서를 삽입합니다.