Complex Aggregation Pipelines with Vanilla PHP and MongoDB
Rate this quickstart
In the world of data, processing large volumes is both a challenge and a necessity. Whether it's analysing sales data to identify trends, summarising user activities to provide personalised experience, or transforming raw data into meaningful insights, the ability to process and aggregate data is crucial. Let's understand this with an example.
Imagine you've got a movie dataset, a collection that includes extensive information about movies, user reviews, and ratings. To improve user experience and engagements, you might want to identify the most popular genres, understand the average ratings of the movies, determine reviews to comment ratio, and so on.
To get these insights, writing queries for individual metrics would be inefficient and complex. Also, processing large datasets on-the-go would be a compute-intensive task. This is where aggregation becomes crucial.
In the next section, let's understand what aggregation is, how it is achieved in MongoDB, and what pipelines and stages are from a theoretical standpoint, and then we will dive into some hands-on exercises with PHP drivers.
Aggregation is a process of transforming and summarising large sets of data to extract meaningful insights. It involves performing operations like counting, grouping, and averaging to consolidate data into an insightful format. It makes data easier to analyse and understand.
Aggregation in MongoDB is made up of pipelines, which are sequences of stages that process data step-by-step. Think of how an item would move through an assembly line.
- Pipeline: Imagine water flowing through a series of pipes. Similarly, data flows through a series of stages. Each stage performs an operation on the data and passes the transformed data to the next stage. This sequential processing allows for efficient and modular data transformations.
(Illustration of how data would flow through the pipeline consisting of different stages)
In the next steps, we will set up a free Atlas cluster and load a sample Mflix movie dataset provided by MongoDB. Skip the next section if you already know how to or already have a free tier cluster ready with Mflix database loaded.
- Sign up for MongoDB Atlas if you haven't already, and create a free tier M0 cluster. If you're not sure how to do it, follow the step-by-step tutorial to deploy a free cluster.
- Load the sample dataset provided by MongoDB called Mflix, which we will use throughout this tutorial. If you need guidance, learn how to load data into Atlas.
- Check that you're able to connect to your cluster. You should be able to see the sample dataset in Compass. Having trouble? Learn how to connect via Compass.
Before we start writing aggregation pipelines, let's prepare our own development environment and make sure we have everything ready to go for writing our own pipelines.
Make sure your machine is running with at least PHP 8.1, and you have MongoDB PHP library 1.17 or plus and MongoDB PHP extension 1.16 or plus installed. Follow our guide to install the MongoDB PHP library, if needed.
At this point, you should have your drivers, database cluster with sample dataset loaded, and everything required for executing a PHP script.
1 2 require_once __DIR__ . '/vendor/autoload.php'; 3 use MongoDB\Client; 4 use MongoDB\Driver\ServerApi; 5 6 // Replace the placeholder with your Atlas connection string 7 $uri = "<your connection string>"; 8 9 // Specify Stable API version 1 10 $apiVersion = new ServerApi(ServerApi::V1); 11 12 // Create a new client and connect to the server 13 $client = new MongoDB\Client($uri, [], ['serverApi' => $apiVersion]); 14 15 try { 16 // Send a ping to confirm a successful connection 17 $client->selectDatabase('admin')->command(['ping' => 1]); 18 echo "Pinged your deployment. You successfully connected to MongoDB!\n"; 19 } catch (Exception $e) { 20 printf($e->getMessage()); 21 } 22 23 // We will write all the aggregation examples code from here.
This code uses the ping admin command to check whether your machine can connect to the cluster using your PHP code and the MongoDB PHP driver. We will use PHP CLI to check the output of the above and all the following scripts we will execute for this tutorial.
1 $> php aggregation.php 2 $> Pinged your deployment. You successfully connected to MongoDB!
(Running code block using PHP CLI for checking connectivity)
In this section, we will use aggregation pipelines to get useful insights from the sample Mflix dataset that we imported in the beginning of this tutorial. These examples will give you an idea of the most commonly used aggregation stages and how data passes through one stage to the another. You will see some of the complex examples, as well, which can offer some perspective on how you could handle such scenarios in your projects.
To solve this problem, we will break it down into the following stages:
- Group users by a unique identifier.
- Sum the total comments left by each user.
- Sort the results in descending order by the total comments.
- Display the top five users.
Let's convert the above steps into an aggregation query in PHP.
1 2 $commentsCollection = $client->sample_mflix->comments; 3 $pipeline = [ 4 [ 5 '$group' => [ 6 '_id' => '$email', // Group by user email 7 'totalComments' => ['$sum' => 1] // Sum the total comments of each user 8 ] 9 ], 10 [ 11 '$sort' => ['totalComments' => -1] // Sort by total comments in descending order 12 ], 13 [ 14 '$limit' => 5 // Limit the results to the top 5 users 15 ] 16 ]; 17 $result = $commentsCollection->aggregate($pipeline); 18 $resultArray = $result->toArray(); 19 echo "<pre>" . print_r($resultArray, true) . "</pre>";
1 Pinged your deployment. You successfully connected to MongoDB! 2 [ 3 { 4 "_id": "roger_ashton-griffiths@gameofthron.es", 5 "totalComments": 277 6 }, 7 { 8 "_id": "ron_donachie@gameofthron.es", 9 "totalComments": 260 10 }, 11 { 12 "_id": "jonathan_pryce@gameofthron.es", 13 "totalComments": 260 14 }, 15 { 16 "_id": "nathalie_emmanuel@gameofthron.es", 17 "totalComments": 258 18 }, 19 { 20 "_id": "robert_jordan@fakegmail.com", 21 "totalComments": 257 22 } 23 ]
- Group users by email
- The
$group
stage groups documents by the email field. - It creates a new field,
totalComments
, which is the sum of comments of each user. $sum:1
will add 1 for every record present with that email id.
- Sort results
- The
$sort
stage sorts the grouped documents from the first step by thetotalComments
field in descending order.
- Display top 10 results
- The
$limit
stage restricts the output to the top 10 users.
To solve this problem, we will break it down into the following stages:
- Ensure that the documents have an
imdb.rating
field and a released date. - Extract the year and month from the released date.
- Group and calculate the average rating for each month.
- Sort the results by year and month.
Let's convert the above steps into an aggregation query in PHP.
1 2 $moviesCollection = $client->sample_mflix->movies; 3 $pipeline = [ 4 [ 5 '$match' => [ 6 'imdb.rating' => ['$exists' => true] // Ensure that the movie has an IMDb rating 7 ] 8 ], 9 [ 10 '$addFields' => [ 11 'month' => ['$month' => '$released'], // Extract month from released date 12 'year' => ['$year' => '$released'], // Extract year from released date 13 ] 14 ], 15 [ 16 '$group' => [ 17 '_id' => [ 18 'year' => '$year', 19 'month' => '$month' 20 ], 21 'averageRating' => ['$avg' => '$imdb.rating'] // Calculate the average rating 22 ] 23 ], 24 [ 25 '$sort' => [ 26 '_id.year' => 1, 27 '_id.month' => 1 28 ] 29 ], 30 ]; 31 $result = $moviesCollection->aggregate($pipeline); 32 // Convert the result to an array and pretty print 33 $resultArray = $result->toArray(); 34 echo "<pre>" . print_r($resultArray, true) . "</pre>";
1 Pinged your deployment. You successfully connected to MongoDB! 2 [ 3 { 4 "_id": { 5 "year": 1914, 6 "month": 3 7 }, 8 "averageRating": 7.6 9 }, 10 { 11 "_id": { 12 "year": 1914, 13 "month": 9 14 }, 15 "averageRating": 7.3 16 }, 17 { 18 "_id": { 19 "year": 1914, 20 "month": 12 21 }, 22 "averageRating": 5.8 23 }, 24 { 25 "_id": { 26 "year": 1915, 27 "month": 1 28 }, 29 "averageRating": 6.4 30 }, 31 { 32 "_id": { 33 "year": 1915, 34 "month": 9 35 }, 36 "averageRating": 6.8 37 } 38 ... 39 ]
- Match stage
- The
$match
stage filters documents as per conditions passed. - It filters out documents with an
imdb.rating
field.
- Addfields stage
- The
$addfields
stage adds year and month fields to the$released
field.
- Group stage
- The
$group
stage groups documents by the year and month fields. $avg
calculates the average ratings of theimdb.rating
field.
- Sort stage
- The
$sort
stage sorts the results in ascending order by the year and month fields.
We are going to use
$facet
here to demonstrate how two different metrics that require different operations/calculations can be performed in the single query with the same set of input documents.The idea here is to get the top 5 most commented movies, and use the same movies collection to get the top 5 most commented genres.
To solve this problem, we will break it down into the following stages:
- We can fetch both metrics in a single query.
- Calculate the top 5 most commented movies.
- Calculate the top 5 most commented genres.
- Return the top 5 most commented movies and genres.
Let's convert the above steps into an aggregation query in PHP.
1 2 $moviesCollection = $client->sample_mflix->movies; 3 $pipeline = [ 4 [ 5 '$lookup' => [ 6 'from' => 'comments', 7 'localField' => '_id', 8 'foreignField' => 'movie_id', 9 'as' => 'comments' 10 ] 11 ], 12 [ 13 '$facet' => [ 14 'mostCommentedMovies' => [ 15 [ 16 '$project' => [ 17 '_id' => 0, 18 'title' => 1, 19 'comment_count' => ['$size' => '$comments'] 20 ] 21 ], 22 [ 23 '$sort' => ['comment_count' => -1] 24 ], 25 [ 26 '$limit' => 5 27 ] 28 ], 29 'mostCommentedGenre' => [ 30 [ 31 '$unwind' => '$genres' 32 ], 33 [ 34 '$group' => [ 35 '_id' => '$genres', 36 'totalComments' => ['$sum' => ['$size' => '$comments']] 37 ] 38 ], 39 [ 40 '$sort' => ['totalComments' => -1] 41 ], 42 [ 43 '$limit' => 5 44 ] 45 ], 46 ] 47 ] 48 ]; 49 $result = $moviesCollection->aggregate($pipeline); 50 // Convert the result to an array and pretty print 51 $resultArray = $result->toArray(); 52 echo "<pre>" . print_r($resultArray, true) . "</pre>";
1 Pinged your deployment. You successfully connected to MongoDB! 2 [{ 3 "mostCommentedMovies": [ 4 { 5 "title": "The Taking of Pelham 1 2 3", 6 "comment_count": 161 7 }, 8 { 9 "title": "Ocean's Eleven", 10 "comment_count": 158 11 }, 12 { 13 "title": "About a Boy", 14 "comment_count": 158 15 }, 16 { 17 "title": "Terminator Salvation", 18 "comment_count": 158 19 }, 20 { 21 "title": "50 First Dates", 22 "comment_count": 158 23 } 24 ], 25 "mostCommentedGenre": [ 26 { 27 "_id": "Comedy", 28 "totalComments": 15435 29 }, 30 { 31 "_id": "Adventure", 32 "totalComments": 12363 33 }, 34 { 35 "_id": "Drama", 36 "totalComments": 12339 37 }, 38 { 39 "_id": "Action", 40 "totalComments": 11419 41 }, 42 { 43 "_id": "Fantasy", 44 "totalComments": 6955 45 } 46 ] 47 }]
- Lookup stage
- The
$lookup
stage joins the movies collection with the comments collection on the_id
field to fetch comments.
- The
$facet
stage allows running multiple aggregation pipelines in a single stage and can be used for deriving different metrics and views from the same dataset simultaneously. - The result of the join gets stored in the movie field.
- mostCommentedMovies metric pipeline
- The
$project
stage allows adding new fields to the pipeline and also offers flexibility of showing or hiding fields. We have hidden the_id
field and kept thetitle
field and added thecomment_count
field. - The
$sort
stage sorts the users by the number of comment counts in ascending order. - The
$limit
stage limits the output to five documents.
- mostCommentedGenres metric pipeline
- The
$unwind
stage flattens the genres array to individual records. - The
$group
stage groups the documents by genre and calculates the total comments for that specific genre. - The
$sort
stage sorts the genres output by total number of comments. - The
$limit
stage limits the output to five documents.
To solve this problem, we will break it down into the following stages:
- Content preference can be identified with user engagements.
- More comments on movie genres can help to understand use preferences of movie genres.
Let's convert the above steps into an aggregation query in PHP.
1 2 $commentsCollection = $client->sample_mflix->comments; 3 $pipeline = [ 4 ['$lookup' => [ 5 'from' => 'movies', 6 'localField' => 'movie_id', 7 'foreignField' => '_id', 8 'as' => 'movie' 9 ]], 10 ['$unwind' => '$movie'], 11 ['$replaceRoot' => ['newRoot' => [ 12 'comment_id' => '$_id', 13 'user' => '$name', 14 'movie_id' => '$movie_id', 15 'movie_title' => '$movie.title', 16 'genres' => '$movie.genres', 17 'rating' => '$movie.imdb.rating', 18 'comment' => '$text' 19 ]]], 20 ['$unwind' => '$genres'], 21 ['$group' => [ 22 '_id' => [ 23 'user' => '$user', 24 'genre' => '$genres' 25 ], 26 'totalComments' => ['$sum' => 1], 27 'averageRating' => ['$avg' => '$rating'] 28 ]], 29 ['$group' => [ 30 '_id' => '$_id.user', 31 'preferences' => [ 32 '$push' => [ 33 'genre' => '$_id.genre', 34 'totalComments' => '$totalComments', 35 'averageRating' => ['$round' => ['$avg' => '$averageRating', 2]] 36 ] 37 ], 38 'totalComments' => ['$sum' => '$totalComments'] 39 ]], 40 ['$sort' => ['totalComments' => -1]] 41 ]; 42 43 $result = $commentsCollection->aggregate($pipeline); 44 // Convert the result to an array and pretty print 45 $resultArray = $result->toArray(); 46 echo "<pre>" . print_r($resultArray, true) . "</pre>";
- Lookup stage
- The
$lookup
stage performs a left join with the comments collection and movies collection on themovie_id
to fetch movie details.
- Unwind stage
- The
$unwind
stage flattens the joined movie array and the genres array to treat each genre separately.
- Replaceroot stage
- The
$replaceRoot
stage replaces the root document with a new structure that includes user, movie details, genre, rating, and comment information.
- First Group stage
- Groups the documents by user and genre, calculating: i.
totalComments
, the total number of comments made by each user on movies of each genre. ii.averageRating
, the average rating given by the user on movies of each genre.
- Second Group stage
- This groups the documents again by user, aggregating the preferences for each genre into an array. It also calculates the total number of comments made by the user across all genres.
- Sort stage
- The
$sort
stage sorts the results by total comments in descending order.
To solve this problem, we will break it down into the following stages:
- When a user makes a comment, send a notification.
- When a user makes a comment, update a live feed, as well.
Let's convert the above steps into an aggregation query in PHP.
1 2 $commentsCollection = $client->sample_mflix->comments; 3 $liveFeedCollection = $client->sample_mflix->live_feed; 4 $changeStream = $commentsCollection->watch(); 5 6 function sendNotification($comment) { 7 printf("Notification sent for new comment by %s on movie ID %s\n", $comment['name'], $comment['movie_id']); 8 } 9 10 function updateLiveFeed($comment, $liveFeedCollection) { 11 $liveFeedDocument = [ 12 'user' => $comment['name'], 13 'movie_id' => $comment['movie_id'], 14 'comment' => $comment['text'], 15 'timestamp' => new MongoDB\BSON\UTCDateTime() 16 ]; 17 $liveFeedCollection->insertOne($liveFeedDocument); 18 printf("Live feed updated with new comment by %s\n", $comment['name']); 19 } 20 21 for ($changeStream->rewind(); true; $changeStream->next()) { 22 if (!$changeStream->valid()) { 23 continue; 24 } 25 26 $event = $changeStream->current(); 27 28 if ($event['operationType'] === 'invalidate') { 29 break; 30 } 31 32 if ($event['operationType'] === 'insert') { 33 $newComment = $event['fullDocument']; 34 sendNotification($newComment); 35 updateLiveFeed($newComment, $liveFeedCollection); 36 } 37 }
- Notification: When a new comment is added, the
sendNotification
function is called, which sends a notification - Live feed update: the
updateLiveFeed
function inserts the new comment into thelive_feed
collection, which could be used to display recent activities on a website or application dashboard.
- Change stream stage
- The
$changestream
stage opens a change stream on the comments collection, which listens for any changes in real time.
- sendNotification function
- This placeholder function simulates sending a notification whenever a new comment is added.
- updateLiveFeed function
- This updates the
live_feed
collection with the new comment to keep a record of the latest activities.
- For loop
- This iterates over the change stream, processing each change. If the change type is an insert, it triggers the notification function and updates the live feed.
The examples we have seen thus far cover the aggregation pipeline stages and operators used in ideal cases. From analysing user engagement metrics to performing real-time updates upon a user's action, MongoDB's aggregation pipeline allows for complex data transformation and insightful analytics.
While we've covered a few stages essential for aggregating and analysing data, MongoDB offers a wide array of aggregation pipeline stages beyond what is covered. To delve deeper into more advanced stages and analysis, refer to the official MongoDB documentation and the official PHP driver documentation.
Expand your understanding and leverage MongoDB's aggregation framework efficiently in PHP to address diverse data processing requirements and enhance your application's capabilities.
If you've any doubts or questions or want to discuss this or any new use cases further, you can reach out to me on LinkedIn.
Top Comments in Forums
There are no comments on this article yet.