문서 메뉴
문서 홈
/
MongoDB 매뉴얼
/ /

증분 맵-리듀스 수행

이 페이지의 내용

  • 데이터 설정
  • 초기 맵 리듀스 collection
  • 후속 증분 맵-리듀스
  • 집계 대안

참고

대안으로서의 애그리게이션 파이프라인

MongoDB 5 부터 시작.0 맵 리듀스 는 더 이상 사용되지 않습니다.

맵 리듀스 대안으로서의 애그리게이션 파이프라인 예시는 다음을 참조하세요.

이 섹션에는 사용자 지정 함수를 사용하지 않는 맵 리듀스 대신 사용할 수 있는 집계 파이프라인 예시가 있습니다. 사용자 지정 함수를 사용하는 예제는 맵-리듀스에서 집계 파이프라인으로의 섹션을 참조하세요.

맵 리듀스 작업을 수행하기 위해 MongoDB는 mapReduce 명령과 mongosh 에서 db.collection.mapReduce() 래퍼 메서드를 제공합니다.

맵 리듀스 데이터 세트가 지속적으로 증가하는 경우 매번 전체 데이터 세트에 대해 맵 리듀스 작업을 수행하는 대신 증분 맵 리듀스를 수행하는 것이 좋습니다.

증분 맵 리듀스를 수행하려면 다음을 수행합니다.

  1. 현재 collection에 대해 맵 리듀스 작업을 실행하고 결과를 별도의 collection에 출력합니다.

  2. 처리할 데이터가 더 많은 경우 다음을 사용하여 후속 맵 리듀스 작업을 실행합니다.

    • 새 문서와 일치하는 조건을 지정하는 query 매개변수입니다.

    • 새 결과를 기존 출력 collection에 병합하는 reduce 조치를 지정하는 out 매개 변수입니다.

매일 하루가 끝날 때까지 usersessions 컬렉션에 대한 맵 리듀스 작업이 실행되도록 예약하는 다음 예시를 생각해 보세요.

usersessions collection에는 매일 사용자의 세션을 기록하는 문서가 포함되어 있습니다.

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

다음과 같이 첫 번째 맵 리듀스 작업을 실행합니다.

  1. useridtotal_time, countavg_time 필드가 포함된 객체에 매핑하는 맵 함수를 정의합니다.

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. 두 개의 인수 keyvalues 을 사용하여 해당 축소 함수를 정의하여 총 시간과 횟수를 계산합니다. keyuserid 에 해당하고, values 는 요소가 mapFunctionuserid 에 매핑된 개별 객체에 해당하는 배열입니다.

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. 두 개의 인수 keyreducedValue 을 사용하여 최종 함수를 정의합니다. 이 함수는 reducedValue 문서를 수정하여 다른 필드 average 을 추가하고 수정된 문서를 반환합니다.

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. mapFunction , reduceFunctionfinalizeFunction 함수를 사용하여 usersessions 컬렉션에서 맵 리듀스를 수행합니다. 결과를 collection session_stats 에 출력합니다. session_stats collection이 이미 존재하는 경우 작업은 해당 콘텐츠를 대체합니다.

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. session_stats 컬렉션을 쿼리하여 결과를 확인합니다.

    db.session_stats.find().sort( { _id: 1 } )

    이 작업은 다음 문서를 반환합니다.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

나중에 usersessions collection이 커짐에 따라 추가 맵 리듀스 작업을 실행할 수 있습니다. 예를 들어, usersessions collection에 새 문서를 추가합니다.

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

하루가 끝나면 usersessions collection에 대해 증분 맵 리듀스를 수행하되 query 필드를 사용하여 새 문서만 선택합니다. 결과를 collection session_stats 에 출력하지만 reduce 내용을 증분 맵 리듀스 결과로 출력합니다.

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

session_stats 컬렉션을 쿼리하여 결과를 확인합니다.

db.session_stats.find().sort( { _id: 1 } )

이 작업은 다음 문서를 반환합니다.

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

맵 리듀스 대신 단계와 단계를 결합하는 $group 집계 $merge 파이프라인 을 사용하여 보다 유연한 작업으로 동일한 결과를 얻을 수 있습니다.

usersessions collection을 다시 생성합니다.

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

사용 가능한 집계 파이프라인 연산자를 사용하면 사용자 지정 함수를 정의하지 않고도 맵 리듀스 예제를 다시 작성할 수 있습니다.

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. $groupuserid 을 기준으로 그룹화하고 다음을 계산합니다.

    • $sum 연산자를 사용하는 total_time

    • $sum 연산자를 사용하는 count

    • $avg 연산자를 사용하는 avg_time

    이 작업은 다음 문서를 반환합니다.

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. $project 단계에서는 맵 리듀스의 출력을 미러링하여 _idvalue 두 필드를 갖도록 출력 문서를 재구성합니다. _idvalue 구조를 미러링할 필요가 없는 경우 이 단계는 선택 사항입니다.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. 단계는 $merge 결과를 session_stats_agg collection에 출력합니다. 기존 문서에 새 결과와 동일한 _id 이(가) 있는 경우 작업은 지정된 파이프라인을 적용하여 결과와 기존 문서에서 total_time, 개수, avg_time을 계산합니다. session_stats_agg 에 동일한 _id 를 가진 기존 문서가 없는 경우 작업은 문서를 삽입합니다.

  4. session_stats_agg 컬렉션을 쿼리하여 결과를 확인합니다.

    db.session_stats_agg.find().sort( { _id: 1 } )

    이 작업은 다음 문서를 반환합니다.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. usersessions collection에 새 문서를 추가합니다.

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. 파이프라인 시작 부분에 $match 단계를 추가하여 날짜 필터를 지정합니다.

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. session_stats_agg 컬렉션을 쿼리하여 결과를 확인합니다.

    db.session_stats_agg.find().sort( { _id: 1 } )

    이 작업은 다음 문서를 반환합니다.

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. 선택 사항. 실행할 때마다 집계 파이프라인의 $match 날짜 조건을 수정하지 않으려면 헬퍼 함수에서 집계 래핑을 정의할 수 있습니다.

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    그런 다음 실행하려면 updateSessionStats() 함수에 시작 날짜를 전달하기만 하면 됩니다.

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

← 맵-리듀스 예제