Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
MongoDB
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
MongoDBchevron-right

Complex Aggregation Pipelines with Vanilla PHP and MongoDB

Viraj Thakrar10 min read • Published Sep 05, 2024 • Updated Sep 05, 2024
PHPMongoDB
SNIPPET
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
In the world of data, processing large volumes is both a challenge and a necessity. Whether it's analysing sales data to identify trends, summarising user activities to provide personalised experience, or transforming raw data into meaningful insights, the ability to process and aggregate data is crucial. Let's understand this with an example.
Imagine you've got a movie dataset, a collection that includes extensive information about movies, user reviews, and ratings. To improve user experience and engagements, you might want to identify the most popular genres, understand the average ratings of the movies, determine reviews to comment ratio, and so on.
To get these insights, writing queries for individual metrics would be inefficient and complex. Also, processing large datasets on-the-go would be a compute-intensive task. This is where aggregation becomes crucial.
In the next section, let's understand what aggregation is, how it is achieved in MongoDB, and what pipelines and stages are from a theoretical standpoint, and then we will dive into some hands-on exercises with PHP drivers.

What is aggregation?

Aggregation is a process of transforming and summarising large sets of data to extract meaningful insights. It involves performing operations like counting, grouping, and averaging to consolidate data into an insightful format. It makes data easier to analyse and understand.
Aggregation in MongoDB is made up of pipelines, which are sequences of stages that process data step-by-step. Think of how an item would move through an assembly line.
Cloud Agg (Illustration of how data would flow through the pipeline consisting of different stages)
In the next steps, we will set up a free Atlas cluster and load a sample Mflix movie dataset provided by MongoDB. Skip the next section if you already know how to or already have a free tier cluster ready with Mflix database loaded.

Prepare a free tier cluster with sample dataset

  1. Sign up for MongoDB Atlas if you haven't already, and create a free tier M0 cluster. If you're not sure how to do it, follow the step-by-step tutorial to deploy a free cluster.
  2. Load the sample dataset provided by MongoDB called Mflix, which we will use throughout this tutorial. If you need guidance, learn how to load data into Atlas.
  3. Check that you're able to connect to your cluster. You should be able to see the sample dataset in Compass. Having trouble? Learn how to connect via Compass.

Prepare your own development environment

Before we start writing aggregation pipelines, let's prepare our own development environment and make sure we have everything ready to go for writing our own pipelines.
Make sure your machine is running with at least PHP 8.1, and you have MongoDB PHP library 1.17 or plus and MongoDB PHP extension 1.16 or plus installed. Follow our guide to install the MongoDB PHP library, if needed.

Connect to your Atlas cluster using PHP

At this point, you should have your drivers, database cluster with sample dataset loaded, and everything required for executing a PHP script.
1<?php
2require_once __DIR__ . '/vendor/autoload.php';
3use MongoDB\Client;
4use MongoDB\Driver\ServerApi;
5
6// Replace the placeholder with your Atlas connection string
7$uri = "<your connection string>";
8
9// Specify Stable API version 1
10$apiVersion = new ServerApi(ServerApi::V1);
11
12// Create a new client and connect to the server
13$client = new MongoDB\Client($uri, [], ['serverApi' => $apiVersion]);
14
15try {
16 // Send a ping to confirm a successful connection
17 $client->selectDatabase('admin')->command(['ping' => 1]);
18 echo "Pinged your deployment. You successfully connected to MongoDB!\n";
19} catch (Exception $e) {
20 printf($e->getMessage());
21}
22
23// We will write all the aggregation examples code from here.
This code uses the ping admin command to check whether your machine can connect to the cluster using your PHP code and the MongoDB PHP driver. We will use PHP CLI to check the output of the above and all the following scripts we will execute for this tutorial.
1$> php aggregation.php
2$> Pinged your deployment. You successfully connected to MongoDB!
(Running code block using PHP CLI for checking connectivity)

Let's dive into complex aggregation pipelines using PHP

In this section, we will use aggregation pipelines to get useful insights from the sample Mflix dataset that we imported in the beginning of this tutorial. These examples will give you an idea of the most commonly used aggregation stages and how data passes through one stage to the another. You will see some of the complex examples, as well, which can offer some perspective on how you could handle such scenarios in your projects.

Find users who have commented the most on movies

To solve this problem, we will break it down into the following stages:
  1. Group users by a unique identifier.
  2. Sum the total comments left by each user.
  3. Sort the results in descending order by the total comments.
  4. Display the top five users.
Let's convert the above steps into an aggregation query in PHP.
1<?php
2$commentsCollection = $client->sample_mflix->comments;
3$pipeline = [
4 [
5 '$group' => [
6 '_id' => '$email', // Group by user email
7 'totalComments' => ['$sum' => 1] // Sum the total comments of each user
8 ]
9 ],
10 [
11 '$sort' => ['totalComments' => -1] // Sort by total comments in descending order
12 ],
13 [
14 '$limit' => 5 // Limit the results to the top 5 users
15 ]
16];
17$result = $commentsCollection->aggregate($pipeline);
18$resultArray = $result->toArray();
19echo "<pre>" . print_r($resultArray, true) . "</pre>";

Results

1Pinged your deployment. You successfully connected to MongoDB!
2[
3 {
4 "_id": "roger_ashton-griffiths@gameofthron.es",
5 "totalComments": 277
6 },
7 {
8 "_id": "ron_donachie@gameofthron.es",
9 "totalComments": 260
10 },
11 {
12 "_id": "jonathan_pryce@gameofthron.es",
13 "totalComments": 260
14 },
15 {
16 "_id": "nathalie_emmanuel@gameofthron.es",
17 "totalComments": 258
18 },
19 {
20 "_id": "robert_jordan@fakegmail.com",
21 "totalComments": 257
22 }
23]

Explanation

  1. Group users by email
    • The $group stage groups documents by the email field.
    • It creates a new field, totalComments, which is the sum of comments of each user.
    • $sum:1 will add 1 for every record present with that email id.
  2. Sort results
    • The $sort stage sorts the grouped documents from the first step by the totalComments field in descending order.
  3. Display top 10 results
    • The $limit stage restricts the output to the top 10 users.

Get monthly average ratings over time

To solve this problem, we will break it down into the following stages:
  1. Ensure that the documents have an imdb.rating field and a released date.
  2. Extract the year and month from the released date.
  3. Group and calculate the average rating for each month.
  4. Sort the results by year and month.
Let's convert the above steps into an aggregation query in PHP.
1<?php
2$moviesCollection = $client->sample_mflix->movies;
3$pipeline = [
4 [
5 '$match' => [
6 'imdb.rating' => ['$exists' => true] // Ensure that the movie has an IMDb rating
7 ]
8 ],
9 [
10 '$addFields' => [
11 'month' => ['$month' => '$released'], // Extract month from released date
12 'year' => ['$year' => '$released'], // Extract year from released date
13 ]
14 ],
15 [
16 '$group' => [
17 '_id' => [
18 'year' => '$year',
19 'month' => '$month'
20 ],
21 'averageRating' => ['$avg' => '$imdb.rating'] // Calculate the average rating
22 ]
23 ],
24 [
25 '$sort' => [
26 '_id.year' => 1,
27 '_id.month' => 1
28 ]
29 ],
30];
31$result = $moviesCollection->aggregate($pipeline);
32// Convert the result to an array and pretty print
33$resultArray = $result->toArray();
34echo "<pre>" . print_r($resultArray, true) . "</pre>";

Results

1Pinged your deployment. You successfully connected to MongoDB!
2[
3 {
4 "_id": {
5 "year": 1914,
6 "month": 3
7 },
8 "averageRating": 7.6
9 },
10 {
11 "_id": {
12 "year": 1914,
13 "month": 9
14 },
15 "averageRating": 7.3
16 },
17 {
18 "_id": {
19 "year": 1914,
20 "month": 12
21 },
22 "averageRating": 5.8
23 },
24 {
25 "_id": {
26 "year": 1915,
27 "month": 1
28 },
29 "averageRating": 6.4
30 },
31 {
32 "_id": {
33 "year": 1915,
34 "month": 9
35 },
36 "averageRating": 6.8
37 }
38 ...
39]

Explanation

  1. Match stage
    • The $match stage filters documents as per conditions passed.
    • It filters out documents with an imdb.rating field.
  2. Addfields stage
    • The $addfields stage adds year and month fields to the $released field.
  3. Group stage
    • The $group stage groups documents by the year and month fields.
    • $avg calculates the average ratings of the imdb.rating field.
  4. Sort stage
    • The $sort stage sorts the results in ascending order by the year and month fields.

Find user engagement metrics: Most commented movies, Most commented genres

We are going to use $facet here to demonstrate how two different metrics that require different operations/calculations can be performed in the single query with the same set of input documents.
The idea here is to get the top 5 most commented movies, and use the same movies collection to get the top 5 most commented genres.
To solve this problem, we will break it down into the following stages:
  1. We can fetch both metrics in a single query.
  2. Calculate the top 5 most commented movies.
  3. Calculate the top 5 most commented genres.
  4. Return the top 5 most commented movies and genres.
Let's convert the above steps into an aggregation query in PHP.
1<?php
2$moviesCollection = $client->sample_mflix->movies;
3$pipeline = [
4 [
5 '$lookup' => [
6 'from' => 'comments',
7 'localField' => '_id',
8 'foreignField' => 'movie_id',
9 'as' => 'comments'
10 ]
11 ],
12 [
13 '$facet' => [
14 'mostCommentedMovies' => [
15 [
16 '$project' => [
17 '_id' => 0,
18 'title' => 1,
19 'comment_count' => ['$size' => '$comments']
20 ]
21 ],
22 [
23 '$sort' => ['comment_count' => -1]
24 ],
25 [
26 '$limit' => 5
27 ]
28 ],
29 'mostCommentedGenre' => [
30 [
31 '$unwind' => '$genres'
32 ],
33 [
34 '$group' => [
35 '_id' => '$genres',
36 'totalComments' => ['$sum' => ['$size' => '$comments']]
37 ]
38 ],
39 [
40 '$sort' => ['totalComments' => -1]
41 ],
42 [
43 '$limit' => 5
44 ]
45 ],
46 ]
47 ]
48];
49$result = $moviesCollection->aggregate($pipeline);
50// Convert the result to an array and pretty print
51$resultArray = $result->toArray();
52echo "<pre>" . print_r($resultArray, true) . "</pre>";

Results

1Pinged your deployment. You successfully connected to MongoDB!
2[{
3 "mostCommentedMovies": [
4 {
5 "title": "The Taking of Pelham 1 2 3",
6 "comment_count": 161
7 },
8 {
9 "title": "Ocean's Eleven",
10 "comment_count": 158
11 },
12 {
13 "title": "About a Boy",
14 "comment_count": 158
15 },
16 {
17 "title": "Terminator Salvation",
18 "comment_count": 158
19 },
20 {
21 "title": "50 First Dates",
22 "comment_count": 158
23 }
24 ],
25 "mostCommentedGenre": [
26 {
27 "_id": "Comedy",
28 "totalComments": 15435
29 },
30 {
31 "_id": "Adventure",
32 "totalComments": 12363
33 },
34 {
35 "_id": "Drama",
36 "totalComments": 12339
37 },
38 {
39 "_id": "Action",
40 "totalComments": 11419
41 },
42 {
43 "_id": "Fantasy",
44 "totalComments": 6955
45 }
46 ]
47}]

Explanation

  1. Lookup stage
    • The $lookup stage joins the movies collection with the comments collection on the _id field to fetch comments.
    • The $facet stage allows running multiple aggregation pipelines in a single stage and can be used for deriving different metrics and views from the same dataset simultaneously.
    • The result of the join gets stored in the movie field.
  2. mostCommentedMovies metric pipeline
    • The $project stage allows adding new fields to the pipeline and also offers flexibility of showing or hiding fields. We have hidden the _id field and kept the title field and added the comment_count field.
    • The $sort stage sorts the users by the number of comment counts in ascending order.
    • The $limit stage limits the output to five documents.
  3. mostCommentedGenres metric pipeline
    • The $unwind stage flattens the genres array to individual records.
    • The $group stage groups the documents by genre and calculates the total comments for that specific genre.
    • The $sort stage sorts the genres output by total number of comments.
    • The $limit stage limits the output to five documents.

Content preferences analysis: Find out users' preferences in terms of movie genres

To solve this problem, we will break it down into the following stages:
  1. Content preference can be identified with user engagements.
  2. More comments on movie genres can help to understand use preferences of movie genres.
Let's convert the above steps into an aggregation query in PHP.
1<?php
2$commentsCollection = $client->sample_mflix->comments;
3$pipeline = [
4 ['$lookup' => [
5 'from' => 'movies',
6 'localField' => 'movie_id',
7 'foreignField' => '_id',
8 'as' => 'movie'
9 ]],
10 ['$unwind' => '$movie'],
11 ['$replaceRoot' => ['newRoot' => [
12 'comment_id' => '$_id',
13 'user' => '$name',
14 'movie_id' => '$movie_id',
15 'movie_title' => '$movie.title',
16 'genres' => '$movie.genres',
17 'rating' => '$movie.imdb.rating',
18 'comment' => '$text'
19 ]]],
20 ['$unwind' => '$genres'],
21 ['$group' => [
22 '_id' => [
23 'user' => '$user',
24 'genre' => '$genres'
25 ],
26 'totalComments' => ['$sum' => 1],
27 'averageRating' => ['$avg' => '$rating']
28 ]],
29 ['$group' => [
30 '_id' => '$_id.user',
31 'preferences' => [
32 '$push' => [
33 'genre' => '$_id.genre',
34 'totalComments' => '$totalComments',
35 'averageRating' => ['$round' => ['$avg' => '$averageRating', 2]]
36 ]
37 ],
38 'totalComments' => ['$sum' => '$totalComments']
39 ]],
40 ['$sort' => ['totalComments' => -1]]
41];
42
43$result = $commentsCollection->aggregate($pipeline);
44// Convert the result to an array and pretty print
45$resultArray = $result->toArray();
46echo "<pre>" . print_r($resultArray, true) . "</pre>";

Explanation

  1. Lookup stage
    • The $lookup stage performs a left join with the comments collection and movies collection on the movie_id to fetch movie details.
  2. Unwind stage
    • The $unwind stage flattens the joined movie array and the genres array to treat each genre separately.
  3. Replaceroot stage
    • The $replaceRoot stage replaces the root document with a new structure that includes user, movie details, genre, rating, and comment information.
  4. First Group stage
    • Groups the documents by user and genre, calculating: i. totalComments, the total number of comments made by each user on movies of each genre. ii. averageRating, the average rating given by the user on movies of each genre.
  5. Second Group stage
    • This groups the documents again by user, aggregating the preferences for each genre into an array. It also calculates the total number of comments made by the user across all genres.
  6. Sort stage
    • The $sort stage sorts the results by total comments in descending order.

Real-time new comment notifications and live feed update

To solve this problem, we will break it down into the following stages:
  1. When a user makes a comment, send a notification.
  2. When a user makes a comment, update a live feed, as well.
Let's convert the above steps into an aggregation query in PHP.
1<?php
2$commentsCollection = $client->sample_mflix->comments;
3$liveFeedCollection = $client->sample_mflix->live_feed;
4$changeStream = $commentsCollection->watch();
5
6function sendNotification($comment) {
7 printf("Notification sent for new comment by %s on movie ID %s\n", $comment['name'], $comment['movie_id']);
8}
9
10function updateLiveFeed($comment, $liveFeedCollection) {
11 $liveFeedDocument = [
12 'user' => $comment['name'],
13 'movie_id' => $comment['movie_id'],
14 'comment' => $comment['text'],
15 'timestamp' => new MongoDB\BSON\UTCDateTime()
16 ];
17 $liveFeedCollection->insertOne($liveFeedDocument);
18 printf("Live feed updated with new comment by %s\n", $comment['name']);
19}
20
21for ($changeStream->rewind(); true; $changeStream->next()) {
22 if (!$changeStream->valid()) {
23 continue;
24 }
25
26 $event = $changeStream->current();
27
28 if ($event['operationType'] === 'invalidate') {
29 break;
30 }
31
32 if ($event['operationType'] === 'insert') {
33 $newComment = $event['fullDocument'];
34 sendNotification($newComment);
35 updateLiveFeed($newComment, $liveFeedCollection);
36 }
37}

Results

  • Notification: When a new comment is added, the sendNotification function is called, which sends a notification
  • Live feed update: the updateLiveFeed function inserts the new comment into the live_feed collection, which could be used to display recent activities on a website or application dashboard.

Explanation

  1. Change stream stage
    • The $changestream stage opens a change stream on the comments collection, which listens for any changes in real time.
  2. sendNotification function
    • This placeholder function simulates sending a notification whenever a new comment is added.
  3. updateLiveFeed function
    • This updates the live_feed collection with the new comment to keep a record of the latest activities.
  4. For loop
    • This iterates over the change stream, processing each change. If the change type is an insert, it triggers the notification function and updates the live feed.

Summary

The examples we have seen thus far cover the aggregation pipeline stages and operators used in ideal cases. From analysing user engagement metrics to performing real-time updates upon a user's action, MongoDB's aggregation pipeline allows for complex data transformation and insightful analytics.
While we've covered a few stages essential for aggregating and analysing data, MongoDB offers a wide array of aggregation pipeline stages beyond what is covered. To delve deeper into more advanced stages and analysis, refer to the official MongoDB documentation and the official PHP driver documentation.
Expand your understanding and leverage MongoDB's aggregation framework efficiently in PHP to address diverse data processing requirements and enhance your application's capabilities.
If you've any doubts or questions or want to discuss this or any new use cases further, you can reach out to me on LinkedIn.
Top Comments in Forums
There are no comments on this article yet.
Start the Conversation

Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Article

MongoDB Orchestration With Spring & Atlas Kubernetes Operator


Jun 12, 2024 | 13 min read
Tutorial

Build a Newsletter Platform With Flask and MongoDB


Sep 04, 2024 | 11 min read
Tutorial

Analyze Time-Series Data with Python and MongoDB Using PyMongoArrow and Pandas


Sep 21, 2023 | 6 min read
News & Announcements

MongoDB 3.6: Here to SRV you with easier replica set connections


Sep 23, 2022 | 4 min read
Table of Contents