Handling Complex Aggregation Pipelines With C#
Avalie esse Tutorial
As shown in my last article on the basics of running aggregation pipelines with the MongoDB C# driver, there are plenty of ways to set up and run aggregation pipelines from a .NET application. The MongoDB C# driver brings a powerful LINQ driver that simplifies building aggregation pipelines for developers who are experienced with C# and less with MongoDB.
For experienced developers, or when you have complex aggregation pipelines, you will likely use MongoDB Compass or another tool to set up, test, and fine-tune an aggregation pipeline that you want to run from a C# application later on. Using this easy workflow, you can use all aggregation pipeline stages and build the pipeline on a realistic dataset before including it in code. Also, for experienced MongoDB developers, it might be easier to read the aggregation pipeline in JSON form instead of C# statements.
This tutorial focusses on an approach that enables experienced MongoDB developers to use pipelines from MongoDB Compass and include them in C# code. This approach complements the basic methods shown in my last tutorial and can be used for complex scenarios.
The samples that are used throughout this article are based on the movies collection in the samples_mflix database. We will use the following Data Transfer Object (DTO) classes:
1 [ ]2 [ ]3 public class Movie 4 { 5 [ ]6 public required string Title { get; set; } 7 8 [ ]9 public required int Year { get; set; } 10 11 [ ]12 public List<string> Cast { get; set; } = new(); 13 14 [ ]15 public Imdb Imdb { get; set; } = new(); 16 } 17 18 [ ]19 public class Imdb 20 { 21 [ ]22 public double Rating { get; set; } 23 } 24 25 public class RatingByYear 26 { 27 [ ]28 public int Year { get; set; } 29 30 [ ]31 public double AvgRating { get; set; } 32 }
When setting up an aggregation pipeline in MongoDB Compass, you can retrieve the JSON for your pipeline by switching to the Text view:
This shows the complete pipeline so that you can easily copy the JSON to the clipboard. As a first step, you can include the JSON in a C# multiline string. In our case, we use the aggregation from the last article for demonstration purposes:
1 var pipelineStr = """ 2 [ 3 { 4 $match: { 5 cast: "Robert De Niro" 6 } 7 }, 8 { 9 $group: { 10 _id: "$year", 11 rating: { $avg: "$imdb.rating" } 12 } 13 }, 14 { 15 $sort: { 16 rating: -1 17 } 18 } 19 ] 20 """; 21 22 PipelineDefinition<Movie, RatingByYear> pipeline = BsonSerializer.Deserialize<BsonArray>(pipelineStr) 23 .Select(x => x.AsBsonDocument) 24 .ToArray(); 25 26 var result = await (await movies.AggregateAsync<RatingByYear>(pipeline)).ToListAsync();
The above code deserializes the pipeline from the multiline string and converts it to an array of
BsonDocument
objects. There is an implicit conversion operator that allows converting the array to a PipelineDefinition<Movie, RatingByYear>
object. This pipeline definition can then be supplied as a variable to the AggregateAsync
method that runs the aggregation and returns the result.Instead of including the pipeline as a string in your C# code, you can also read it from a file that you deploy with your application, or embed a JSON file as a resource in your application:
This provides a better separation between the C# code and the pipelines. You can access the resource like this:
1 // Be sure to use the correct resource name, the schema is as follows: 2 // <PROJECT ROOT NAMESPACE>.<Folders>.<FileName> 3 // If the resource name is invalid, null is returned 4 using var stream = typeof(Program).Assembly.GetManifestResourceStream("MyNamespace.Pipelines.RatingByYear.json"); 5 using var reader = new StreamReader(stream!); 6 var pipelineStr = await reader.ReadToEndAsync(); 7 8 PipelineDefinition<Movie, RatingByYear> pipeline = BsonSerializer.Deserialize<BsonArray>(pipelineStr) 9 .Select(x => x.AsBsonDocument) 10 .ToArray(); 11 12 var result = await (await movies.AggregateAsync(pipeline)).ToListAsync();
Having a static pipeline is a good start, but what if you want to make adjustments when running the aggregation pipeline? In our example, we might be interested in querying other members of the cast and only return a configurable number of years with the top ratings.
1 [ 2 { 3 "$match": { 4 "$expr": { 5 "$in": [ 6 "$$castMember", 7 { 8 "$ifNull": [ 9 "$cast", 10 [] 11 ] 12 } 13 ] 14 } 15 } 16 }, 17 { 18 "$group": { 19 "_id": "$year", 20 "rating": { "$avg": "$imdb.rating" } 21 } 22 }, 23 { 24 "$setWindowFields": { 25 "sortBy": { "rating": -1 }, 26 "output": { 27 "docNo": { 28 "$documentNumber": {} 29 } 30 } 31 } 32 }, 33 { 34 "$match": { 35 "$expr": { 36 "$lte": [ "$docNo", "$$topN" ] 37 } 38 } 39 }, 40 { 41 "$unset": "docNo" 42 } 43 ]
The above pipeline has been adjusted to use the variables
$$castMember
and $$topN
. We will have a closer look on these adjustments in the following section. For now, we focus on running the pipeline in C#. To provide values for the variables, an AggregationOptions
object is used, as shown below:1 using var stream = typeof(Program).Assembly.GetManifestResourceStream("MyNamespace.Pipelines.RatingByYearParameterized.json"); 2 using var reader = new StreamReader(stream!); 3 var pipelineStr = await reader.ReadToEndAsync(); 4 5 PipelineDefinition<Movie, RatingByYear> pipeline = BsonSerializer.Deserialize<BsonArray>(pipelineStr) 6 .Select(x => x.AsBsonDocument) 7 .ToArray(); 8 9 var options = new AggregateOptions() 10 { 11 Let = new BsonDocument() 12 { 13 { "castMember", "Al Pacino" }, 14 { "topN", 10 }, 15 } 16 }; 17 18 var result = await (await movies.AggregateAsync(pipeline, options)).ToListAsync();
As you have seen, the pipeline has been adjusted to use the variables. This is necessary because not all stages use the variable values as expected. For instance, the first
$match
stage does not evaluate a variable when using the standard $match
syntax:1 { 2 $match: { 3 cast: "$$castMember" 4 } 5 },
As the condition as such is valid for MongoDB, this does not raise an error but leads to an empty result set as there is no movie with a cast member that goes by the unusual name $$castMember. Using
$expr
in the condition asserts that the variables are replaced with their values when performing the comparison:1 { 2 "$match": { 3 "$expr": { 4 "$in": [ 5 "$$castMember", 6 { 7 "$ifNull": [ 8 "$cast", 9 [] 10 ] 11 } 12 ] 13 } 14 } 15 },
In addition, the expression syntax handles null values differently, so we need to add an
$ifNull
expression that asserts that missing or null values are replaced with an empty array.In most cases, however, MongoDB raises an error when the variable cannot be used. In our sample, the pipeline cannot use the
$limit
stage because this stage expects a static value for the maximum number of documents that should be returned. Hence we have to replace the simple $limit
stage with a combination of:$setWindowFields
that sorts the documents and adds a propertydocNo
for the number of the document.$match
that keeps only the number of documents specified in the variable (also with an$expr
condition).$unset
to remove thedocNo
property as we do not need it in our result set.
1 { 2 "$setWindowFields": { 3 "sortBy": { "rating": -1 }, 4 "output": { 5 "docNo": { 6 "$documentNumber": {} 7 } 8 } 9 } 10 }, 11 { 12 "$match": { 13 "$expr": { 14 "$lte": [ "$docNo", "$$topN" ] 15 } 16 } 17 }, 18 { 19 "$unset": "docNo" 20 }
When testing your pipeline, be aware that you also need to look for unexpected and especially empty result sets when introducing variables in a pipeline.
This tutorial outlined several ways to handle complex aggregation pipelines in C# while still keeping the original MongoDB pipeline definition available. You can use the latest and greatest MongoDB aggregation pipeline stages in their original form. Also, for developers who are experienced with MongoDB, the JSON format of the pipeline might be easier to read. When applying changes to the pipeline later on, you can test and analyze the exact pipeline in MongoDB Compass or the MongoDB shell.
What is your favorite method to tackle challenging aggregation pipelines with the MongoDB C# driver? Let us know in the MongoDB Developer Community Forums!
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.