증분 맵-리듀스 수행
참고
대안으로서의 집계 파이프라인
MongoDB 5.0 부터 맵 리듀스 는 더 이상 사용되지 않습니다.
맵 리듀스 대신 집계 파이프라인을 사용해야 합니다. 집계 파이프라인은 맵 리듀스보다 성능과 유용성 측면에서 더 우수합니다.
4}
$group
$merge
, 등과 같은 집계 파이프라인 단계를 사용하여 맵 축소 연산을 다시 작성할 수 있습니다.사용자 지정 기능이 필요한 맵 리듀스 작업의 경우
$accumulator
및$function
집계 연산자를 사용할 수 있습니다. 이러한 연산자를 사용하여 JavaScript에서 사용자 지정 집계 표현식을 정의할 수 있습니다.
맵 리듀스 대안으로서의 집계 파이프라인 예시는 다음을 참조하세요.
이 섹션에는 사용자 지정 함수를 사용하지 않는 맵 리듀스 대신 사용할 수 있는 집계 파이프라인 예시가 있습니다. 사용자 지정 함수를 사용하는 예제는 맵-리듀스에서 집계 파이프라인으로의 섹션을 참조하세요.
맵 리듀스 작업을 수행하기 위해 MongoDB는 mapReduce
명령과 mongosh
에서 db.collection.mapReduce()
래퍼 메서드를 제공합니다.
맵 리듀스 데이터 세트가 지속적으로 증가하는 경우 매번 전체 데이터 세트에 대해 맵 리듀스 작업을 수행하는 대신 증분 맵 리듀스를 수행하는 것이 좋습니다.
증분 맵 리듀스를 수행하려면 다음을 수행합니다.
현재 collection에 대해 맵 리듀스 작업을 실행하고 결과를 별도의 collection에 출력합니다.
처리할 데이터가 더 많은 경우 다음을 사용하여 후속 맵 리듀스 작업을 실행합니다.
새 문서와 만 일치하는 조건을 지정하는
query
매개변수입니다.새 결과를 기존 출력 collection에 병합하는
reduce
조치를 지정하는out
매개 변수입니다.
매일 하루가 끝날 때까지 usersessions
컬렉션에 대한 맵 리듀스 작업이 실행되도록 예약하는 다음 예시를 생각해 보세요.
데이터 설정
usersessions
collection에는 매일 사용자의 세션을 기록하는 문서가 포함되어 있습니다.
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
초기 맵 리듀스 collection
다음과 같이 첫 번째 맵 리듀스 작업을 실행합니다.
userid
을total_time
,count
및avg_time
필드가 포함된 객체에 매핑하는 맵 함수를 정의합니다.var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; 두 개의 인수
key
와values
을 사용하여 해당 축소 함수를 정의하여 총 시간과 횟수를 계산합니다.key
는userid
에 해당하고,values
는 요소가mapFunction
의userid
에 매핑된 개별 객체에 해당하는 배열입니다.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; 두 개의 인수
key
및reducedValue
을 사용하여 최종 함수를 정의합니다. 이 함수는reducedValue
문서를 수정하여 다른 필드average
을 추가하고 수정된 문서를 반환합니다.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; mapFunction
,reduceFunction
및finalizeFunction
함수를 사용하여usersessions
컬렉션에서 맵 리듀스를 수행합니다. 결과를 collectionsession_stats
에 출력합니다.session_stats
collection이 이미 존재하는 경우 작업은 해당 콘텐츠를 대체합니다.db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) session_stats
컬렉션을 쿼리하여 결과를 확인합니다.db.session_stats.find().sort( { _id: 1 } ) 이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
후속 증분 맵-리듀스
나중에 usersessions
collection이 커짐에 따라 추가 맵 리듀스 작업을 실행할 수 있습니다. 예를 들어, usersessions
collection에 새 문서를 추가합니다.
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
하루가 끝나면 usersessions
collection에 대해 증분 맵 리듀스를 수행하되 query
필드를 사용하여 새 문서만 선택합니다. 결과를 collection session_stats
에 출력하지만 reduce
내용을 증분 맵 리듀스 결과로 출력합니다.
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
session_stats
컬렉션을 쿼리하여 결과를 확인합니다.
db.session_stats.find().sort( { _id: 1 } )
이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
집계 대안
맵 리듀스 대신 단계와 단계를 결합하는 $group
집계 $merge
파이프라인 을 사용하여 보다 유연한 작업으로 동일한 결과를 얻을 수 있습니다.
usersessions
collection을 다시 생성합니다.
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
사용 가능한 집계 파이프라인 연산자를 사용하면 사용자 지정 함수를 정의하지 않고도 맵 리듀스 예제를 다시 작성할 수 있습니다.
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
$group
는userid
을 기준으로 그룹화하고 다음을 계산합니다.이 작업은 다음 문서를 반환합니다.
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } $project
단계에서는 맵 리듀스의 출력을 미러링하여_id
와value
두 필드를 갖도록 출력 문서를 재구성합니다._id
및value
구조를 미러링할 필요가 없는 경우 이 단계는 선택 사항입니다.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } 단계는
$merge
결과를session_stats_agg
collection에 출력합니다. 기존 문서에 새 결과와 동일한_id
이(가) 있는 경우 작업은 지정된 파이프라인을 적용하여 결과와 기존 문서에서 total_time, 개수, avg_time을 계산합니다.session_stats_agg
에 동일한_id
를 가진 기존 문서가 없는 경우 작업은 문서를 삽입합니다.session_stats_agg
컬렉션을 쿼리하여 결과를 확인합니다.db.session_stats_agg.find().sort( { _id: 1 } ) 이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } usersessions
collection에 새 문서를 추가합니다.db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) 파이프라인 시작 부분에
$match
단계를 추가하여 날짜 필터를 지정합니다.db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) session_stats_agg
컬렉션을 쿼리하여 결과를 확인합니다.db.session_stats_agg.find().sort( { _id: 1 } ) 이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } 선택 사항. 실행할 때마다 집계 파이프라인의
$match
날짜 조건을 수정하지 않으려면 헬퍼 함수에서 집계 래핑을 정의할 수 있습니다.updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; 그런 다음 실행하려면
updateSessionStats()
함수에 시작 날짜를 전달하기만 하면 됩니다.updateSessionStats(ISODate('2020-03-05 00:00:00'))