Pipelines de agregação complexos com Baunilha PHP e MongoDB
Avalie esse Início rápido
No mundo dos dados, o processamento de grandes volumes é um desafio e uma necessidade. Seja analisando dados de vendas para identificar tendências, resumindo as atividades do usuário para fornecer uma experiência personalizada ou transformando dados brutos em insights significativos, a capacidade de processar e agregar dados é crucial. Vamos entender isso com um exemplo.
Imagine que você tenha um conjunto de dados de filmes, uma collection que inclui amplas informações sobre filmes, avaliações de usuários e classificações. Para melhorar a experiência e o engajamento do usuário, convém identificar os gêneros mais populares, entender a média de classificação dos filmes, determinar a proporção de avaliações para comentários e assim por diante.
Para obter esses insights, escrever queries para métricas individuais seria ineficiente e complexo. Além disso, o processamento de grandes conjuntos de dados em trânsito seria uma tarefa de computação de uso intensivo. É aqui que a agregação se torna crucial.
Na próxima seção, vamos entender o que é agregação, como ela é obtida no MongoDB e quais são os pipelines e estágios do ponto de vista teórico e, em seguida, mergulharemos em alguns exercícios práticos com drivers PHP.
A agregação é um processo de transformar e resumir grandes conjuntos de dados para extrair insights significativos. Envolve a realização de operações como contagem, agrupamento e obtenção de média para consolidar dados em um formato perspicaz. Torna os dados mais fáceis de analisar e compreender.
A agregação no MongoDB é composta por pipelines, que são sequências de estágios que processam dados passo a passo. Pense em como um item se moveria por uma linha de montagem.
- Pipeline: visualize a agua a fluir por uma serie de pipelines. Da mesma forma, os dados fluem por uma série de estágios. Cada estágio executa uma operação nos dados e passa os dados transformados para o próximo estágio. Esse processamento sequencial permite transformações de dados eficientes e modulares.
(Ilustração de como os dados fluiriam através do pipeline que consiste em diferentes estágios)
Nas próximas etapas, configuraremos um Atlas cluster gratuito e carregaremos um conjunto de dados defilme Mflix de amostra fornecido pelo MongoDB. Pule a próxima seção se você já souber como fazer ou se já tiver um cluster de camada grátis pronto com o banco de dados Mflix carregado.
- Cadastre-se no MongoDB Atlas, se ainda não o fez, e crie um cluster M0 camada gratuita. Se você não tiver certeza de como fazer isso, siga o tutorial passo a passo para implantar um cluster gratuito.
- Carregue o conjunto de dados de amostra fornecido pelo MongoDB chamado Mflix, que usaremos ao longo deste tutorial. Se precisar de orientação, saiba como carregar dados no Atlas.
- Verifique se você consegue se conectar ao seu cluster. Você deve conseguir ver o conjunto de dados de amostra no Compass. Está com problemas? Saiba como se conectar via Compass.
Antes de começar a escrever pipelines de agregação, Go preparar nosso próprio ambiente de desenvolvimento e garantir que temos tudo pronto para escrever nossos próprios pipelines.
Certifique-se de que sua máquina esteja sendo executada com pelo menos PHP 8.1e você tem a biblioteca PHP do MongoDB 1.17 ou mais e extensão MongoDB PHP 1.16 ou mais instalado. Siga nosso guia para instalar a biblioteca MongoDB PHP, se necessário.
Nesse ponto, você deve ter seus drivers, o cluster de banco de dados com o conjunto de dados de amostra carregados e tudo o que é necessário para executar um script PHP.
1 2 require_once __DIR__ . '/vendor/autoload.php'; 3 use MongoDB\Client; 4 use MongoDB\Driver\ServerApi; 5 6 // Replace the placeholder with your Atlas connection string 7 $uri = "<your connection string>"; 8 9 // Specify Stable API version 1 10 $apiVersion = new ServerApi(ServerApi::V1); 11 12 // Create a new client and connect to the server 13 $client = new MongoDB\Client($uri, [], ['serverApi' => $apiVersion]); 14 15 try { 16 // Send a ping to confirm a successful connection 17 $client->selectDatabase('admin')->command(['ping' => 1]); 18 echo "Pinged your deployment. You successfully connected to MongoDB!\n"; 19 } catch (Exception $e) { 20 printf($e->getMessage()); 21 } 22 23 // We will write all the aggregation examples code from here.
Esse código usa o comando ping admin para verificar se sua máquina pode se conectar ao cluster usando seu código PHP e o driver PHP do MongoDB. Usaremos o PHP CLI para verificar a saída dos itens acima e todos os scripts a seguir que executaremos para este tutorial.
1 $> php aggregation.php 2 $> Pinged your deployment. You successfully connected to MongoDB!
(Execução do bloco de código usando o PHP CLI para verificar a conectividade)
Nesta seção, usaremos pipelines de agregação para obter insights úteis do conjunto de dados de amostra Mflix que importamos no início deste tutorial. Esses exemplos darão uma ideia dos estágios de agregação mais usados e de como os dados passam de um estágio para outro. Você também verá alguns dos exemplos complexos, que podem oferecer alguma perspectiva de como você pode lidar com esses cenários em seus projetos.
Para resolver esse problema, vamos dividi-lo nas seguintes etapas:
- Agrupe usuários por um identificador exclusivo.
- Soma o total de comentários deixados por cada usuário.
- Classifique os resultados em ordem decrescente pelo total de comentários.
- Exiba os cinco principais usuários.
Vamos converter as etapas acima em uma query de agregação em PHP.
1 2 $commentsCollection = $client->sample_mflix->comments; 3 $pipeline = [ 4 [ 5 '$group' => [ 6 '_id' => '$email', // Group by user email 7 'totalComments' => ['$sum' => 1] // Sum the total comments of each user 8 ] 9 ], 10 [ 11 '$sort' => ['totalComments' => -1] // Sort by total comments in descending order 12 ], 13 [ 14 '$limit' => 5 // Limit the results to the top 5 users 15 ] 16 ]; 17 $result = $commentsCollection->aggregate($pipeline); 18 $resultArray = $result->toArray(); 19 echo "<pre>" . print_r($resultArray, true) . "</pre>";
1 Pinged your deployment. You successfully connected to MongoDB! 2 [ 3 { 4 "_id": "roger_ashton-griffiths@gameofthron.es", 5 "totalComments": 277 6 }, 7 { 8 "_id": "ron_donachie@gameofthron.es", 9 "totalComments": 260 10 }, 11 { 12 "_id": "jonathan_pryce@gameofthron.es", 13 "totalComments": 260 14 }, 15 { 16 "_id": "nathalie_emmanuel@gameofthron.es", 17 "totalComments": 258 18 }, 19 { 20 "_id": "robert_jordan@fakegmail.com", 21 "totalComments": 257 22 } 23 ]
- Agrupar usuários por e-mail
- O estágio
$group
agrupa documentos pelo campo de e-mail. - Ele cria um novo campo,
totalComments
, que é a soma dos comentários de cada usuário. $sum:1
adicionará 1 para cada registro presente com esse ID de e-mail.
- Classificar resultados
- O estágio
$sort
classifica os documentos agrupados da primeira etapa pelo campototalComments
em ordem decrescente.
- Exibir os 10 principais resultados
- O estágio
$limit
restringe a saída aos principais usuários 10 .
Para resolver esse problema, vamos dividi-lo nas seguintes etapas:
- Certifique-se de que os documentos tenham um campo
imdb.rating
e uma data de liberação. - Extraia o ano e o mês da data de lançamento.
- Agrupar e calcular a classificação média para cada mês.
- Classifique os resultados por ano e mês.
Vamos converter as etapas acima em uma query de agregação em PHP.
1 2 $moviesCollection = $client->sample_mflix->movies; 3 $pipeline = [ 4 [ 5 '$match' => [ 6 'imdb.rating' => ['$exists' => true] // Ensure that the movie has an IMDb rating 7 ] 8 ], 9 [ 10 '$addFields' => [ 11 'month' => ['$month' => '$released'], // Extract month from released date 12 'year' => ['$year' => '$released'], // Extract year from released date 13 ] 14 ], 15 [ 16 '$group' => [ 17 '_id' => [ 18 'year' => '$year', 19 'month' => '$month' 20 ], 21 'averageRating' => ['$avg' => '$imdb.rating'] // Calculate the average rating 22 ] 23 ], 24 [ 25 '$sort' => [ 26 '_id.year' => 1, 27 '_id.month' => 1 28 ] 29 ], 30 ]; 31 $result = $moviesCollection->aggregate($pipeline); 32 // Convert the result to an array and pretty print 33 $resultArray = $result->toArray(); 34 echo "<pre>" . print_r($resultArray, true) . "</pre>";
1 Pinged your deployment. You successfully connected to MongoDB! 2 [ 3 { 4 "_id": { 5 "year": 1914, 6 "month": 3 7 }, 8 "averageRating": 7.6 9 }, 10 { 11 "_id": { 12 "year": 1914, 13 "month": 9 14 }, 15 "averageRating": 7.3 16 }, 17 { 18 "_id": { 19 "year": 1914, 20 "month": 12 21 }, 22 "averageRating": 5.8 23 }, 24 { 25 "_id": { 26 "year": 1915, 27 "month": 1 28 }, 29 "averageRating": 6.4 30 }, 31 { 32 "_id": { 33 "year": 1915, 34 "month": 9 35 }, 36 "averageRating": 6.8 37 } 38 ... 39 ]
- Estágio de correspondência
- O estágio
$match
filtra documentos conforme as condições passadas. - Ele filtra documentos com um campo
imdb.rating
.
- Estágio Addfields
- O estágio
$addfields
adiciona campos de ano e mês ao campo$released
.
- Fase de grupos
- O estágio
$group
agrupa documentos pelos campos de ano e mês. $avg
calcula as classificações médias do campoimdb.rating
.
- Estágio de classificação
- O estágio
$sort
classifica os resultados em ordem crescente pelos campos de ano e mês.
Vamos usar
$facet
aqui para demonstrar como duas métricas diferentes que exigem operações/cálculos diferentes podem ser executadas na única query com o mesmo conjunto de documentos de entrada.A ideia aqui é obter os 5 filmes mais comentados e usar a mesma coleção de filmes para obter os 5 gêneros mais comentados.
Para resolver esse problema, vamos dividi-lo nas seguintes etapas:
- Podemos buscar ambas as métricas em uma única query.
- Calcule os 5 filmes mais comentados.
- Calcule os 5 gêneros mais comentado.
- Retornar os 5 filmes e gêneros mais comentados.
Vamos converter as etapas acima em uma query de agregação em PHP.
1 2 $moviesCollection = $client->sample_mflix->movies; 3 $pipeline = [ 4 [ 5 '$lookup' => [ 6 'from' => 'comments', 7 'localField' => '_id', 8 'foreignField' => 'movie_id', 9 'as' => 'comments' 10 ] 11 ], 12 [ 13 '$facet' => [ 14 'mostCommentedMovies' => [ 15 [ 16 '$project' => [ 17 '_id' => 0, 18 'title' => 1, 19 'comment_count' => ['$size' => '$comments'] 20 ] 21 ], 22 [ 23 '$sort' => ['comment_count' => -1] 24 ], 25 [ 26 '$limit' => 5 27 ] 28 ], 29 'mostCommentedGenre' => [ 30 [ 31 '$unwind' => '$genres' 32 ], 33 [ 34 '$group' => [ 35 '_id' => '$genres', 36 'totalComments' => ['$sum' => ['$size' => '$comments']] 37 ] 38 ], 39 [ 40 '$sort' => ['totalComments' => -1] 41 ], 42 [ 43 '$limit' => 5 44 ] 45 ], 46 ] 47 ] 48 ]; 49 $result = $moviesCollection->aggregate($pipeline); 50 // Convert the result to an array and pretty print 51 $resultArray = $result->toArray(); 52 echo "<pre>" . print_r($resultArray, true) . "</pre>";
1 Pinged your deployment. You successfully connected to MongoDB! 2 [{ 3 "mostCommentedMovies": [ 4 { 5 "title": "The Taking of Pelham 1 2 3", 6 "comment_count": 161 7 }, 8 { 9 "title": "Ocean's Eleven", 10 "comment_count": 158 11 }, 12 { 13 "title": "About a Boy", 14 "comment_count": 158 15 }, 16 { 17 "title": "Terminator Salvation", 18 "comment_count": 158 19 }, 20 { 21 "title": "50 First Dates", 22 "comment_count": 158 23 } 24 ], 25 "mostCommentedGenre": [ 26 { 27 "_id": "Comedy", 28 "totalComments": 15435 29 }, 30 { 31 "_id": "Adventure", 32 "totalComments": 12363 33 }, 34 { 35 "_id": "Drama", 36 "totalComments": 12339 37 }, 38 { 39 "_id": "Action", 40 "totalComments": 11419 41 }, 42 { 43 "_id": "Fantasy", 44 "totalComments": 6955 45 } 46 ] 47 }]
- Estágio de pesquisa
- O estágio
$lookup
une a coleção de filmes à coleção de comentários no campo_id
para buscar comentários.
- O estágio
$facet
permite executar vários pipelines de agregação em um único estágio e pode ser usado para derivar diferentes métricas e visualizações do mesmo conjunto de dados simultaneamente. - O resultado da união é armazenado no campo de filme.
- pipeline de métrica mostCommentedMovies
- O estágio
$project
permite adicionar novos campos ao pipeline e também oferece flexibilidade de mostrar ou ocultar campos. Ocultamos o campo_id
, mantivemos o campotitle
e adicionamos o campocomment_count
. - O estágio
$sort
classifica os usuários pelo número de contagens de comentários em ordem crescente. - O estágio
$limit
limita a saída a cinco documentos.
- pipeline de métrica mostCommentedGenres
- O estágio
$unwind
nivela a matriz de gêneros para registros individuais. - O estágio
$group
agrupa os documentos por gênero e calcula o total de comentários para esse gênero específico. - O estágio
$sort
classifica a saída dos gêneros por número total de comentários. - O estágio
$limit
limita a saída a cinco documentos.
Para resolver esse problema, vamos dividi-lo nas seguintes etapas:
- A preferência de conteúdo pode ser identificada com o engajamento do usuário.
- Mais comentários sobre os gêneros de filmes podem ajudar a entender as preferências de uso dos gêneros de filmes.
Vamos converter as etapas acima em uma query de agregação em PHP.
1 2 $commentsCollection = $client->sample_mflix->comments; 3 $pipeline = [ 4 ['$lookup' => [ 5 'from' => 'movies', 6 'localField' => 'movie_id', 7 'foreignField' => '_id', 8 'as' => 'movie' 9 ]], 10 ['$unwind' => '$movie'], 11 ['$replaceRoot' => ['newRoot' => [ 12 'comment_id' => '$_id', 13 'user' => '$name', 14 'movie_id' => '$movie_id', 15 'movie_title' => '$movie.title', 16 'genres' => '$movie.genres', 17 'rating' => '$movie.imdb.rating', 18 'comment' => '$text' 19 ]]], 20 ['$unwind' => '$genres'], 21 ['$group' => [ 22 '_id' => [ 23 'user' => '$user', 24 'genre' => '$genres' 25 ], 26 'totalComments' => ['$sum' => 1], 27 'averageRating' => ['$avg' => '$rating'] 28 ]], 29 ['$group' => [ 30 '_id' => '$_id.user', 31 'preferences' => [ 32 '$push' => [ 33 'genre' => '$_id.genre', 34 'totalComments' => '$totalComments', 35 'averageRating' => ['$round' => ['$avg' => '$averageRating', 2]] 36 ] 37 ], 38 'totalComments' => ['$sum' => '$totalComments'] 39 ]], 40 ['$sort' => ['totalComments' => -1]] 41 ]; 42 43 $result = $commentsCollection->aggregate($pipeline); 44 // Convert the result to an array and pretty print 45 $resultArray = $result->toArray(); 46 echo "<pre>" . print_r($resultArray, true) . "</pre>";
- Estágio de pesquisa
- O estágio
$lookup
realiza uma união esquerda com a coleção de comentários e a coleção de filmes nomovie_id
para buscar detalhes do filme.
- Unwind stage
- O estágio
$unwind
nivela a matriz de filmes unidos e a matriz de gêneros para tratar cada gênero separadamente.
- Estágio Replaceroot
- O estágio
$replaceRoot
substitui o documento raiz por uma nova estrutura que inclui informações do usuário, do filme, do gênero, da classificação e de comentários.
- Primeira fase de grupos
- Agrupa os documentos por usuário e gênero, calculando: i.
totalComments
, o número total de comentários feitos por cada usuário em filmes de cada gênero. II.averageRating
, a classificação média fornecida pelo usuário em filmes de cada gênero.
- Segunda etapa do grupo
- Isso agrupa os documentos novamente por usuário, agregando as preferências de cada gênero em uma array. Ele também calcula o número total de comentários feitos pelo usuário em todos os gêneros.
- Estágio de classificação
- O estágio
$sort
classifica os resultados por total de comentários em ordem decrescente.
Para resolver esse problema, vamos dividi-lo nas seguintes etapas:
- Quando um usuário fizer um comentário, envie uma notificação.
- Quando um usuário fizer um comentário, atualize também um feed ativo.
Vamos converter as etapas acima em uma query de agregação em PHP.
1 2 $commentsCollection = $client->sample_mflix->comments; 3 $liveFeedCollection = $client->sample_mflix->live_feed; 4 $changeStream = $commentsCollection->watch(); 5 6 function sendNotification($comment) { 7 printf("Notification sent for new comment by %s on movie ID %s\n", $comment['name'], $comment['movie_id']); 8 } 9 10 function updateLiveFeed($comment, $liveFeedCollection) { 11 $liveFeedDocument = [ 12 'user' => $comment['name'], 13 'movie_id' => $comment['movie_id'], 14 'comment' => $comment['text'], 15 'timestamp' => new MongoDB\BSON\UTCDateTime() 16 ]; 17 $liveFeedCollection->insertOne($liveFeedDocument); 18 printf("Live feed updated with new comment by %s\n", $comment['name']); 19 } 20 21 for ($changeStream->rewind(); true; $changeStream->next()) { 22 if (!$changeStream->valid()) { 23 continue; 24 } 25 26 $event = $changeStream->current(); 27 28 if ($event['operationType'] === 'invalidate') { 29 break; 30 } 31 32 if ($event['operationType'] === 'insert') { 33 $newComment = $event['fullDocument']; 34 sendNotification($newComment); 35 updateLiveFeed($newComment, $liveFeedCollection); 36 } 37 }
- Notificação: Quando um novo comentário é adicionado, a função
sendNotification
é chamada, que envia uma notificação - Atualização de feedem tempo real: a função
updateLiveFeed
insere o novo comentário na collectionlive_feed
, que pode ser usada para exibir atividades recentes em um website ou painel de um aplicativo.
- Alterar estágio do stream
- O estágio
$changestream
abre um fluxo de alterações na coleção de comentários, que escuta quaisquer alterações em tempo real.
- função sendNotification
- Esta função de espaço reservado simula o envio de uma notificação sempre que um novo comentário é adicionado.
- função updateLiveFeed
- Isso atualiza a coleção
live_feed
com o novo comentário para manter um registro das últimas atividades.
- Para loop
- Isso se repete no fluxo de alterações, processando cada alteração. Se o tipo de alteração for uma inserção, ele acionará a função de notificação e atualizará o feed em tempo real.
Os exemplos que vimos até agora abrangem os estágios do pipeline de agregação e os operadores usados em casos ideais. Desde a análise de métricas de engajamento do usuário até a execução de atualizações em tempo real sobre a ação de um usuário, o pipeline de agregação do MongoDB permite a transformação de dados complexos e análises perspicazes.
Embora tenhamos abordado alguns estágios essenciais para agregar e analisar dados, o MongoDB oferece uma ampla gama de estágios de pipeline de agregação além do que é abordado. Para se afundar em estágios e análises mais avançados, consulte a documentação oficial do MongoDB e a documentaçãooficial do driver PHP.
Amplie sua compreensão e aproveite a estrutura de agregação do MongoDB de forma eficiente em PHP para atender a diversos requisitos de processamento de dados e aprimorar os recursos do seu aplicativo.
Se tiver alguma dúvida ou quiser discutir esse ou quaisquer novos casos de uso, pode me procurar no LinkedIn.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.