Executar Redução incremental de mapa
Nesta página
Observação
Pipeline de agregação como alternativa
A partir do MongoDB , 5.0, map-reduce está obsoleto:
Em vez de map-reduce, você deve usar um aggregation pipeline. aggregation pipeline fornece melhor desempenho e usabilidade do que a redução de mapa.
Você pode reescrever operações de redução de mapa utilizando estágios do pipeline de agregação, como
$group
,$merge
e outros.Nas operações de map-reduce que exigem funcionalidade personalizada, você pode usar os operadores de agregação
$accumulator
e$function
. Você pode usar esses operadores para definir expressões de agregação personalizadas no JavaScript.
Para obter exemplos de alternativas de aggregation pipeline para map-reduce, consulte:
Esta seção tem um exemplo de alternativa de aggregation pipeline para map-reduce que não usa uma função personalizada. Para obter um exemplo que usa uma função personalizada, consulte map-reduce to pipeline de agregação.
Para executar operações de redução de mapa, o MongoDB fornece o comando mapReduce
e, no mongosh
, o método wrapper do db.collection.mapReduce()
.
Se o conjunto de dados de map-reduce estiver crescendo constantemente, você pode querer executar um map-reduce incremental em vez de executar a operação de map-reduce sobre todo o conjunto de dados a cada vez.
Para executar o map-reduce incremental:
Execute uma tarefa de map-reduce sobre a collection atual e envie o resultado para uma collection separada.
Quando você tiver mais dados para processar, execute trabalhos de redução de mapa subsequentes com:
o parâmetro
query
que especifica condições que correspondem somente aos novos documentos.o parâmetro
out
que especifica a açãoreduce
para mesclar os novos resultados na coleta de saída existente.
Considere o exemplo a seguir, em que você agenda uma operação de map-reduce em uma collection usersessions
para ser executada no final de cada dia.
Configuração de dados
A collection usersessions
contém documento que registram as sessões dos usuários todos os dias, por exemplo:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
map-reduce inicial da collection atual
Execute a primeira operação de map-reduce da seguinte forma:
Defina a função de mapa que mapeia o
userid
para um objeto que contém os campostotal_time
,count
eavg_time
:var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; Defina a função de redução correspondente com dois argumentos
key
evalues
para calcular o tempo total e a contagem. Okey
corresponde aouserid
, e ovalues
é uma array cujos elementos correspondem aos objetos individuais mapeados para ouserid
nomapFunction
.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; Defina a função de finalização com dois argumentos
key
ereducedValue
. A função modifica o documentoreducedValue
para adicionar outro campoaverage
e retorna o documento modificado.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; Execute o map-reduce na collection do
usersessions
utilizando as funçõesmapFunction
,reduceFunction
efinalizeFunction
. Enviar os resultados para uma collectionsession_stats
. Se a collectionsession_stats
já existir, a operação substituirá o conteúdo:db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) Consulte a coleção
session_stats
para verificar os resultados:db.session_stats.find().sort( { _id: 1 } ) A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
Mapeamento incremental subsequente-redução
Posteriormente, à medida que a collection usersessions
cresce, você poderá executar operações adicionais de map-reduce. Por exemplo, adicione novos documentos à collection usersessions
:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
No final do dia, execute a redução de mapa incremental na coleção do usersessions
, mas utilize o campo query
para selecionar somente os novos documentos. Envie os resultados para a collection session_stats
, mas reduce
o conteúdo com os resultados do map-reduce incremental:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
Consulte a coleção session_stats
para verificar os resultados:
db.session_stats.find().sort( { _id: 1 } )
A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
Alternativa de aggregation
Como alternativa ao map-reduce, você pode usar um aggregation pipeline que combina os estágios $group
e $merge
para obter o mesmo resultado em uma operação mais flexível.
Recriar a collection usersessions
:
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Usando os operadores de aggregation pipeline disponíveis, você pode reescrever o exemplo de map-reduce sem definir funções personalizadas:
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
O
$group
agrupa pelouserid
e calcula:A operação retorna os seguintes documentos:
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } O estágio
$project
remodela o documento de saída para espelhar a saída do map-reduce para ter dois campos_id
evalue
. O estágio é opcional se você não precisar espelhar a estrutura_id
evalue
.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } O estágio
$merge
gera resultados para uma collectionsession_stats_agg
. Se um documento existente tiver o mesmo_id
que o novo resultado, a operação aplicará o pipeline especificado para calcular o total_time, count e avg_time a partir do resultado e do documento existente. Se não houver nenhum documento existente com o mesmo_id
nosession_stats_agg
, a operação inserirá o documento.Consulte a coleção
session_stats_agg
para verificar os resultados:db.session_stats_agg.find().sort( { _id: 1 } ) A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } Adicionar novo documento à collection
usersessions
:db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) Adicione um estágio
$match
no início do pipeline para especificar o filtro de data:db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) Consulte a coleção
session_stats_agg
para verificar os resultados:db.session_stats_agg.find().sort( { _id: 1 } ) A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } Opcional. Para evitar a necessidade de modificar a condição de data
$match
do aggregation pipeline toda vez que você executa, você pode definir o encapsulamento do aggregation em uma função auxiliar:updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; Em seguida, para executar, você apenas passaria a data de início para a função
updateSessionStats()
:updateSessionStats(ISODate('2020-03-05 00:00:00'))