Menu Docs
Página inicial do Docs
/
Manual do MongoDB
/ /

Executar Redução incremental de mapa

Nesta página

  • Configuração de dados
  • map-reduce inicial da collection atual
  • Mapeamento incremental subsequente-redução
  • Alternativa de aggregation

Observação

Pipeline de agregação como alternativa

A partir do MongoDB , 5.0, map-reduce está obsoleto:

Para obter exemplos de alternativas de aggregation pipeline para map-reduce, consulte:

Esta seção tem um exemplo de alternativa de aggregation pipeline para map-reduce que não usa uma função personalizada. Para obter um exemplo que usa uma função personalizada, consulte map-reduce to pipeline de agregação.

Para executar operações de map-reduce , o MongoDB fornece o comando mapReduce e, no mongosh, o método wrapper db.collection.mapReduce() .

Se o conjunto de dados de map-reduce estiver crescendo constantemente, você pode querer executar um map-reduce incremental em vez de executar a operação de map-reduce sobre todo o conjunto de dados a cada vez.

Para executar o map-reduce incremental:

  1. Execute uma tarefa de map-reduce sobre a collection atual e envie o resultado para uma collection separada.

  2. Quando você tiver mais dados para processar, execute trabalhos de redução de mapa subsequentes com:

    • o parâmetro query que especifica condições que correspondem somente aos novos documentos.

    • o parâmetro out que especifica a ação reduce para mesclar os novos resultados na coleta de saída existente.

Considere o exemplo a seguir, em que você agenda uma operação de map-reduce em uma collection usersessions para ser executada no final de cada dia.

A collection usersessions contém documento que registram as sessões dos usuários todos os dias, por exemplo:

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Execute a primeira operação de map-reduce da seguinte forma:

  1. Defina a função de mapa que mapeia o userid para um objeto que contém os campos total_time, count e avg_time:

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. Defina a função de redução correspondente com dois argumentos key e values para calcular o tempo total e a contagem. O key corresponde ao userid, e o values é uma array cujos elementos correspondem aos objetos individuais mapeados para o userid no mapFunction.

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. Defina a função de finalização com dois argumentos key e reducedValue. A função modifica o documento reducedValue para adicionar outro campo average e retorna o documento modificado.

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. Execute o map-reduce na collection do usersessions utilizando as funções mapFunction, reduceFunction e finalizeFunction . Enviar os resultados para uma collection session_stats. Se a collection session_stats já existir, a operação substituirá o conteúdo:

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. Consulte a coleção session_stats para verificar os resultados:

    db.session_stats.find().sort( { _id: 1 } )

    A operação retorna os seguintes documentos:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

Posteriormente, à medida que a collection usersessions cresce, você poderá executar operações adicionais de map-reduce. Por exemplo, adicione novos documentos à collection usersessions :

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

No final do dia, execute a redução de mapa incremental na coleção do usersessions , mas utilize o campo query para selecionar somente os novos documentos. Envie os resultados para a collection session_stats, mas reduce o conteúdo com os resultados do map-reduce incremental:

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

Consulte a coleção session_stats para verificar os resultados:

db.session_stats.find().sort( { _id: 1 } )

A operação retorna os seguintes documentos:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

Como alternativa ao map-reduce, você pode usar um aggregation pipeline que combina os estágios $group e $merge para obter o mesmo resultado em uma operação mais flexível.

Recriar a collection usersessions :

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Usando os operadores de aggregation pipeline disponíveis, você pode reescrever o exemplo de map-reduce sem definir funções personalizadas:

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. O $group agrupa pelo userid e calcula:

    • O total_time usando o operador $sum

    • O count usando o operador $sum

    • O avg_time usando o operador $avg

    A operação retorna os seguintes documentos:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. O estágio $project remodela o documento de saída para espelhar a saída do map-reduce para ter dois campos _id e value. O estágio é opcional se você não precisar espelhar a estrutura _id e value .

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. O estágio $merge gera resultados para uma collection session_stats_agg . Se um documento existente tiver o mesmo _id que o novo resultado, a operação aplicará o pipeline especificado para calcular o total_time, count e avg_time a partir do resultado e do documento existente. Se não houver nenhum documento existente com o mesmo _id no session_stats_agg, a operação inserirá o documento.

  4. Consulte a coleção session_stats_agg para verificar os resultados:

    db.session_stats_agg.find().sort( { _id: 1 } )

    A operação retorna os seguintes documentos:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. Adicionar novo documento à collection usersessions :

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. Adicione um estágio $match no início do pipeline para especificar o filtro de data:

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. Consulte a coleção session_stats_agg para verificar os resultados:

    db.session_stats_agg.find().sort( { _id: 1 } )

    A operação retorna os seguintes documentos:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. Opcional. Para evitar a necessidade de modificar a condição de data $match do aggregation pipeline toda vez que você executa, você pode definir o encapsulamento do aggregation em uma função auxiliar:

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    Em seguida, para executar, você apenas passaria a data de início para a função updateSessionStats() :

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

Dica

Veja também:

Voltar

Exemplos