Introdução a pipelines de agregação em Rust
Mark Smith15 min read • Published Feb 05, 2022 • Updated Oct 01, 2024
Classifique este início rápido

Os pipelines de agregação do MongoDB são um de seus recursos mais poderosos. Eles permitem que você escreva expressões, divididas em uma série de estágios, que executam operações, incluindo agregação, transformações e junções nos dados do MongoDB database. Isso permite que você faça cálculos e análises em documentos e coleções dentro do seu MongoDB database.
This quick start is the second in a series of Rust posts. I highly recommend you start with my first post, Basic MongoDB Operations in Rust, which will show you how to get set up correctly with a free MongoDB Atlas database cluster containing the sample data you'll be working with here. Go read it and come back. I'll wait. Without it, you won't have the database set up correctly to run the code in this quick start guide.
Em resumo, você vai precisar de:
- An up-to-date version of Rust. I used 1.49, but any recent version should work well.
- Um cluster do MongoDB contendo o conjunto de dados
sample_mflix
. Você pode encontrar instruções para configurar isso na primeira publicação no blog desta série.
MongoDB's aggregation pipelines are very powerful and so they can seem a little overwhelming at first. For this reason, I'll start off slowly. First, I'll show you how to build up a pipeline that duplicates behaviour that you can already achieve with MongoDB's
find()
method, but instead using an aggregation pipeline with $match
, $sort
, e $limit
stages. Then, I'll show how to make queries that go beyond what can be done with find
, demonstrating using $lookup
to include related documents from another collection. Finally, I'll put the "aggregation" into "aggregation pipeline" by showing you how to use $group
to group together documents to form new document summaries.Todo o código de exemplo desta série de início rápido está disponível no GitHub. Recomendo que você dê uma olhada nele se tiver dúvidas, mas, caso contrário, vale a pena seguir o tutorial e escrever o código você mesmo!
All of the pipelines in this post will be executed against the sample_mflix database's
movies
collection. It contains documents that look like this (I'm showing you what they look like in Python, because it's a little more readable than the equivalent Rust struct):1 { 2 '_id': ObjectId('573a1392f29313caabcdb497'), 3 'awards': {'nominations': 7, 4 'text': 'Won 1 Oscar. Another 2 wins & 7 nominations.', 5 'wins': 3}, 6 'cast': ['Janet Gaynor', 'Fredric March', 'Adolphe Menjou', 'May Robson'], 7 'countries': ['USA'], 8 'directors': ['William A. Wellman', 'Jack Conway'], 9 'fullplot': 'Esther Blodgett is just another starry-eyed farm kid trying to ' 10 'break into the movies. Waitressing at a Hollywood party, she ' 11 'catches the eye of alcoholic star Norman Maine, is given a test, ' 12 'and is caught up in the Hollywood glamor machine (ruthlessly ' 13 'satirized). She and her idol Norman marry; but his career ' 14 'abruptly dwindles to nothing', 15 'genres': ['Drama'], 16 'imdb': {'id': 29606, 'rating': 7.7, 'votes': 5005}, 17 'languages': ['English'], 18 'lastupdated': '2015-09-01 00:55:54.333000000', 19 'plot': 'A young woman comes to Hollywood with dreams of stardom, but ' 20 'achieves them only with the help of an alcoholic leading man whose ' 21 'best days are behind him.', 22 'poster': 'https://m.media-amazon.com/images/M/MV5BMmE5ODI0NzMtYjc5Yy00MzMzLTk5OTQtN2Q3MzgwOTllMTY3XkEyXkFqcGdeQXVyNjc0MzMzNjA@._V1_SY1000_SX677_AL_.jpg', 23 'rated': 'NOT RATED', 24 'released': datetime.datetime(1937, 4, 27, 0, 0), 25 'runtime': 111, 26 'title': 'A Star Is Born', 27 'tomatoes': {'critic': {'meter': 100, 'numReviews': 11, 'rating': 7.4}, 28 'dvd': datetime.datetime(2004, 11, 16, 0, 0), 29 'fresh': 11, 30 'lastUpdated': datetime.datetime(2015, 8, 26, 18, 58, 34), 31 'production': 'Image Entertainment Inc.', 32 'rotten': 0, 33 'viewer': {'meter': 79, 'numReviews': 2526, 'rating': 3.6}, 34 'website': 'http://www.vcientertainment.com/Film-Categories?product_id=73'}, 35 'type': 'movie', 36 'writers': ['Dorothy Parker (screen play)', 37 'Alan Campbell (screen play)', 38 'Robert Carson (screen play)', 39 'William A. Wellman (from a story by)', 40 'Robert Carson (from a story by)'], 41 'year': 1937}
Há muitos dados lá, mas vou me concentrar principalmente nos campos
_id
, title
, year
e cast
. The first argument to
aggregate()
is a sequence of pipeline stages to be executed. Much like a query, each stage of an aggregation pipeline is a BSON document. You'll often create these using the doc!
macro that was introduced in the previous post.An aggregation pipeline operates on all of the data in a collection. Each stage in the pipeline is applied to the documents passing through, and whatever documents are emitted from one stage are passed as input to the next stage, until there are no more stages left. At this point, the documents emitted from the last stage in the pipeline are returned to the client program, as a cursor, in a similar way to a call to
find()
.Individual stages, such as
$match
, can act as a filter, to only pass through documents matching certain criteria. Other stage types, such as $project
, $addFields
, e $lookup
, will modify the content of individual documents as they pass through the pipeline. Finally, certain stage types, such as $group
, will create an entirely new set of documents based on the documents passed into it taken as a whole. None of these stages change the data that is stored in MongoDB itself. They just change the data before returning it to your program! There are stages, like $out, which can save the results of a pipeline back into MongoDB, but I won't be covering it in this quick start.I'm going to assume that you're working in the same environment that you used for the last post, so you should already have the mongodb crate configured as a dependency in your
Cargo.toml
file, and you should have a .env
file containing your MONGODB_URI
environment variable.First, paste the following into your Rust code:
1 // Load the MongoDB connection string from an environment variable: 2 let client_uri = 3 env::var("MONGODB_URI").expect("You must set the MONGODB_URI environment var!"); 4 5 // An extra line of code to work around a DNS issue on Windows: 6 let options = 7 ClientOptions::parse_with_resolver_config(&client_uri, ResolverConfig::cloudflare()) 8 .await?; 9 let client = mongodb::Client::with_options(options)?; 10 11 // Get the 'movies' collection from the 'sample_mflix' database: 12 let movies = client.database("sample_mflix").collection("movies");
The above code will provide a
Collection
instance called movie_collection
, which points to the movies
collection in your database.Aqui está um código que cria um pipeline, executa-o com
aggregate
e, em seguida, faz um loop e imprime os detalhes de cada filme nos resultados. Cole-o em seu programa.1 // Usually implemented outside your main function: 2 3 struct MovieSummary { 4 title: String, 5 cast: Vec<String>, 6 year: i32, 7 } 8 9 impl fmt::Display for MovieSummary { 10 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 11 write!( 12 f, 13 "{}, {}, {}", 14 self.title, 15 self.cast.get(0).unwrap_or(&"- no cast -".to_owned()), 16 self.year 17 ) 18 } 19 } 20 21 // Inside main(): 22 let pipeline = vec![ 23 doc! { 24 // filter on movie title: 25 "$match": { 26 "title": "A Star Is Born" 27 } 28 }, 29 doc! { 30 // sort by year, ascending: 31 "$sort": { 32 "year": 1 33 } 34 }, 35 ]; 36 37 // Look up "A Star is Born" in ascending year order: 38 let mut results = movies.aggregate(pipeline, None).await?; 39 // Loop through the results, convert each of them to a MovieSummary, and then print out. 40 while let Some(result) = results.next().await { 41 // Use serde to deserialize into the MovieSummary struct: 42 let doc: MovieSummary = bson::from_document(result?)?; 43 println!("* {}", doc); 44 }
This pipeline has two stages. The first is a $match stage, which is similar to querying a collection with
find()
. It filters the documents passing through the stage based on a read operation query. Because it's the first stage in the pipeline, its input is all of the documents in the movie
collection. The query for the $match
stage filters on the title
field of the input documents, so the only documents that will be output from this stage will have a title of "A Star Is Born."O segundo estágio é um $sort. Somente os documentos do filme “Nasce uma estrela” são transmitidos para esse estágio, então o resultado serão todos os filmes chamados “Nasce uma estrela”, agora classificados por campo de ano, com o filme mais antigo primeiro.
Calls to aggregate() return a cursor pointing to the resulting documents. Cursor implements the Stream trait. The cursor can be looped through like any other stream, as long as you've imported StreamExt, which provides the
next()
method. The code above loops through all of the returned documents and prints a short summary, consisting of the title, the first actor in the cast
array, and the year the movie was produced.A execução do código acima resulta em:
1 * A Star Is Born, Janet Gaynor, 1937 2 * A Star Is Born, Judy Garland, 1954 3 * A Star Is Born, Barbra Streisand, 1976
É possível criar pipelines de agregação inteiros como uma única estrutura de dados, como no exemplo acima, mas não é necessariamente uma boa ideia. Os pipelines podem ficar longos e complexos. Por esse motivo, recomendo que você crie cada estágio do seu pipeline como uma variável separada e, em seguida, combine os estágios em um pipeline no final, assim:
1 // Match title = "A Star Is Born": 2 let stage_match_title = doc! { 3 "$match": { 4 "title": "A Star Is Born" 5 } 6 }; 7 8 // Sort by year, ascending: 9 let stage_sort_year_ascending = doc! { 10 "$sort": { "year": 1 } 11 }; 12 13 // Now the pipeline is easier to read: 14 let pipeline = vec![stage_match_title, stage_sort_year_ascending];
Imagine que eu quisesse obter a produção mais recente de "A Star Is Born" da coleção de filmes.
Isso pode ser pensado como três etapas, executadas em ordem:
- Obtenha os documentos do filme "A Star Is Born".
- Classificado por ano, ordem decrescente.
- Descarte todos, exceto o primeiro documento.
The first stage is already the same as
stage_match_title
above. The second stage is the same as stage_sort_year_ascending
, but with the value 1
changed to -1
. The third stage is a $limit stage.The modified and new code looks like this:
1 // Sort by year, descending: 2 let stage_sort_year_descending = doc! { 3 "$sort": { 4 "year": -1 5 } 6 }; 7 8 // Limit to 1 document: 9 let stage_limit_1 = doc! { "$limit": 1 }; 10 11 let pipeline = vec![stage_match_title, stage_sort_year_descending, stage_limit_1];
Se você fizer as alterações acima e executar seu código, verá apenas a seguinte linha:
1 * A Star Is Born, Barbra Streisand, 1976
Espere um pouco! Por que não há um documento para a maravilhosa produção com Lady Gaga e Bradley Cooper?
Why don't you use the skills you've just learned to find the latest date in the collection? That will give you your answer!
Agora você já sabe filtrar, classificar e limitar o conteúdo de uma coleção usando um pipeline de agregação. Mas essas são apenas operações que você já pode fazer com
find()
! Por que você usaria esses pipelines de agregação complexos e novos?Read on, my friend, and I will show you the true power of MongoDB aggregation pipelines.
Há um segredo sujo escondido no banco de dados
sample_mflix
. Além da coleção movies
, há também uma coleção chamada comments
. Os documentos da coleção comments
têm esta aparência:1 { 2 '_id': ObjectId('5a9427648b0beebeb69579d3'), 3 'movie_id': ObjectId('573a1390f29313caabcd4217'), 4 'date': datetime.datetime(1983, 4, 27, 20, 39, 15), 5 'email': 'cameron_duran@fakegmail.com', 6 'name': 'Cameron Duran', 7 'text': 'Quasi dicta culpa asperiores quaerat perferendis neque. Est animi ' 8 'pariatur impedit itaque exercitationem.'}
É um comentário sobre um filme. Não sei por que as pessoas estão escrevendo comentários em latim para esses filmes, mas vamos em frente. O segundo campo,
movie_id,
, corresponde ao valor _id
de um documento na coleção movies
.So, it's a comment related to a movie!
Does MongoDB enable you to query movies and embed the related comments, like a JOIN in a relational database? Yes it does—with the $lookup stage.
I'll show you how to obtain related documents from another collection, and embed them in the documents from your primary collection.
First, modify the definition of the
MovieSummary
struct so that it has a comments
field, loaded from a related_comments
BSON field. Define a Comment
struct that contains a subset of the data contained in a comments
document.1 2 struct MovieSummary { 3 title: String, 4 cast: Vec<String>, 5 year: i32, 6 7 comments: Vec<Comment>, 8 } 9 10 11 struct Comment { 12 email: String, 13 name: String, 14 text: String, 15 }
Next, create a new pipeline from scratch, and start with the following:
1 // Look up related documents in the 'comments' collection: 2 let stage_lookup_comments = doc! { 3 "$lookup": { 4 "from": "comments", 5 "localField": "_id", 6 "foreignField": "movie_id", 7 "as": "related_comments", 8 } 9 }; 10 11 // Limit to the first 5 documents: 12 let stage_limit_5 = doc! { "$limit": 5 }; 13 14 let pipeline = vec![ 15 stage_lookup_comments, 16 stage_limit_5, 17 ]; 18 19 let mut results = movies.aggregate(pipeline, None).await?; 20 // Loop through the results and print a summary and the comments: 21 while let Some(result) = results.next().await { 22 let doc: MovieSummary = bson::from_document(result?)?; 23 println!("* {}, comments={:?}", doc, doc.comments); 24 }
The stage I've called
stage_lookup_comments
is a $lookup
stage. This $lookup
stage will look up documents from the comments
collection that have the same movie id. The matching comments will be listed as an array in a BSON field named related_comments
, with an array value containing all of the comments that have this movie's _id
value as movie_id
.Adicionei um estágio
$limit
apenas para garantir que haja uma quantidade razoável de saída sem ser excessiva.Agora, execute o código.
Você pode notar que o pipeline acima funciona bem devagar! Há duas razões para isso:
- Há 23,5k documentos de filmes e 50k comentários.
- Há um índice ausente na coleção
comments
. Ele não está lá de propósito para você aprender sobre índices!
Eu não mostrarei como resolver o problema do índice agora. Escreverei sobre isso em uma publicação posterior desta série, com foco em índices. Em vez disso, mostrarei um truque para trabalhar com pipelines de agregação lentos enquanto você está desenvolvendo.
Working with slow pipelines is a pain while you're writing and testing the pipeline. But, if you put a temporary
$limit
stage at the start of your pipeline, it will make the query faster (although the results may be different because you're not running on the whole dataset).Quando estava escrevendo este pipeline, tinha um primeiro estágio de
{ "$limit": 1000 }
.When you have finished crafting the pipeline, you can comment out the first stage so that the pipeline will now run on the whole collection. Don't forget to remove the first stage, or you're going to get the wrong results!
The aggregation pipeline above will print out summaries of five movie documents. I expect that most or all of your movie summaries will end with this:
comments=[]
.If you're lucky, you may have some documents in the array, but it's unlikely, as most of the movies have no comments. Now, I'll show you how to add some stages to match only movies which have more than two comments.
O ideal é que você pudesse adicionar um único estágio
$match
que tivesse o comprimento do campo related_comments
e o comparasse com a expressão { "$gt": 2 }
. Neste caso, são na verdade duas etapas:- Adicione um campo (que chamarei de
comment_count
) contendo o comprimento do camporelated_comments
. - Correspondência onde o valor de
comment_count
é maior que dois.
Este é o código para os dois estágios:
1 // Calculate the number of comments for each movie: 2 let stage_add_comment_count = doc! { 3 "$addFields": { 4 "comment_count": { 5 "$size": "$related_comments" 6 } 7 } 8 }; 9 10 // Match movie documents with more than 2 comments: 11 let stage_match_with_comments = doc! { 12 "$match": { 13 "comment_count": { 14 "$gt": 2 15 } 16 } 17 };
Os dois estágios ocorrem após o estágio
$lookup
e antes do $limit
5:1 let pipeline = vec![ 2 stage_lookup_comments, 3 stage_add_comment_count, 4 stage_match_with_comments, 5 limit_5, 6 ]
While I'm here, I'm going to clean up the output of this code to format the comments slightly better:
1 let mut results = movies.aggregate(pipeline, None).await?; 2 // Loop through the results and print a summary and the comments: 3 while let Some(result) = results.next().await { 4 let doc: MovieSummary = bson::from_document(result?)?; 5 println!("* {}", doc); 6 if doc.comments.len() > 0 { 7 // Print a max of 5 comments per movie: 8 for comment in doc.comments.iter().take(5) { 9 println!( 10 " - {} <{}>: {}", 11 comment.name, 12 comment.email, 13 comment.text.chars().take(60).collect::<String>(), 14 ); 15 } 16 } else { 17 println!(" - No comments"); 18 } 19 }
Now when you run this code, you should see something more like this:
1 * Midnight, Claudette Colbert, 1939 2 - Sansa Stark <sansa_stark@fakegmail.com>: Error ex culpa dignissimos assumenda voluptates vel. Qui inventore 3 - Theon Greyjoy <theon_greyjoy@fakegmail.com>: Animi dolor minima culpa sequi voluptate. Possimus necessitatibu 4 - Donna Smith <donna_smith@fakegmail.com>: Et esse nulla ducimus tempore aliquid. Suscipit iste dignissimos v
É bom ver que Sansa Stark, de Game of Thrones, realmente sabe tudo de latim, não é?
Now I've shown you how to work with lookups in your pipelines, I'll show you how to use the
$group
stage to do actual aggregation.Começarei com um novo pipeline mais uma vez.
O estágio
$group
é um dos estágios mais difíceis de entender, portanto, vou explicá-lo em detalhes.Comece com o seguinte código:
1 // Define a struct to hold grouped data by year: 2 3 struct YearSummary { 4 _id: i32, 5 6 movie_count: i64, 7 8 movie_titles: Vec<String>, 9 } 10 11 // Some movies have "year" values ending with 'è'. 12 // This stage will filter them out: 13 let stage_filter_valid_years = doc! { 14 "$match": { 15 "year": { 16 "$type": "number", 17 } 18 } 19 }; 20 21 /* 22 * Group movies by year, producing 'year-summary' documents that look like: 23 * { 24 * '_id': 1917, 25 * } 26 */ 27 let stage_group_year = doc! { 28 "$group": { 29 "_id": "$year", 30 } 31 }; 32 33 let pipeline = vec![stage_filter_valid_years, stage_group_year]; 34 35 // Loop through the 'year-summary' documents: 36 let mut results = movies.aggregate(pipeline, None).await?; 37 // Loop through the yearly summaries and print their debug representation: 38 while let Some(result) = results.next().await { 39 let doc: YearSummary = bson::from_document(result?)?; 40 println!("* {:?}", doc); 41 }
In the
movies
collection, some of the years contain the "è" character. This database has some messy values in it. In this case, there's only a small handful of documents, and I think we should just remove them, so I've added a $match
stage that filters out any documents with a year
that's not numeric.Execute este código e você deverá ver algo assim:
1 * YearSummary { _id: 1959, movie_count: 0, movie_titles: [] } 2 * YearSummary { _id: 1980, movie_count: 0, movie_titles: [] } 3 * YearSummary { _id: 1977, movie_count: 0, movie_titles: [] } 4 * YearSummary { _id: 1933, movie_count: 0, movie_titles: [] } 5 * YearSummary { _id: 1998, movie_count: 0, movie_titles: [] } 6 * YearSummary { _id: 1922, movie_count: 0, movie_titles: [] } 7 * YearSummary { _id: 1948, movie_count: 0, movie_titles: [] } 8 * YearSummary { _id: 1965, movie_count: 0, movie_titles: [] } 9 * YearSummary { _id: 1950, movie_count: 0, movie_titles: [] } 10 * YearSummary { _id: 1968, movie_count: 0, movie_titles: [] } 11 ...
Each line is a document emitted from the aggregation pipeline. But you're not looking at movie documents anymore. The
$group
stage groups input documents by the specified _id
expression and outputs one document for each unique _id
value. In this case, the expression is $year
, which means one document will be emitted for each unique value of the year
field. Each document emitted can (and usually will) also contain values generated from aggregating data from the grouped documents. Currently, the YearSummary documents are using the default values for movie_count
and movie_titles
. Let's fix that.Altere a definição de estágio para o seguinte:
1 let stage_group_year = doc! { 2 "$group": { 3 "_id": "$year", 4 // Count the number of movies in the group: 5 "movie_count": { "$sum": 1 }, 6 } 7 };
Isso adicionará um campo
movie_count
, que contém o resultado da adição de 1
para cada documento no grupo. Em outras palavras, ele conta o número de documentos de filmes no grupo. Se você executar o código agora, deverá ver algo assim:1 * YearSummary { _id: 2005, movie_count: 758, movie_titles: [] } 2 * YearSummary { _id: 1999, movie_count: 542, movie_titles: [] } 3 * YearSummary { _id: 1943, movie_count: 36, movie_titles: [] } 4 * YearSummary { _id: 1926, movie_count: 9, movie_titles: [] } 5 * YearSummary { _id: 1935, movie_count: 40, movie_titles: [] } 6 * YearSummary { _id: 1966, movie_count: 116, movie_titles: [] } 7 * YearSummary { _id: 1971, movie_count: 116, movie_titles: [] } 8 * YearSummary { _id: 1952, movie_count: 58, movie_titles: [] } 9 * YearSummary { _id: 2013, movie_count: 1221, movie_titles: [] } 10 * YearSummary { _id: 1912, movie_count: 2, movie_titles: [] } 11 ...
There are a number of accumulator operators, like
$sum
, that allow you to summarize data from the group. If you wanted to build an array of all the movie titles in the emitted document, you could add "movie_titles": { "$push": "$title" },
to the $group
stage. In that case, you would get YearSummary
instances that look like this:1 * YearSummary { _id: 1986, movie_count: 206, movie_titles: ["Defense of the Realm", "F/X", "Mala Noche", "Witch from Nepal", ... ]}
Add the following stage to sort the results:
1 let stage_sort_year_ascending = doc! { 2 "$sort": {"_id": 1} 3 }; 4 5 let pipeline = vec! [ 6 stage_filter_valid_years, // Match numeric years 7 stage_group_year, 8 stage_sort_year_ascending, // Sort by year (which is the unique _id field) 9 ]
Note que o estágio
$match
é adicionado ao início do pipeline e o $sort
é adicionado ao final. Uma regra geral é que você deve filtrar documentos no início do pipeline, para que os estágios posteriores tenham menos documentos para lidar. Isso garante que o pipeline tenha mais chances de aproveitar quaisquer índices apropriados atribuídos à coleção.Lembre-se de que todo o código de exemplo desta série de início rápido está disponível no GitHub.
As agregações usando
$group
são uma ótima maneira de descobrir fatos interessantes sobre seus dados. Neste exemplo, estou ilustrando o número de filmes feitos a cada ano, mas também seria interessante ver informações sobre os filmes de cada país ou mesmo ver os filmes feitos por atores diferentes.Você aprendeu a construir pipelines de agregação para filtrar, agrupar e unir documentos a outras coleções. Esperamos que você tenha aprendido que colocar um estágio
$limit
no início do pipeline pode ser útil para acelerar o desenvolvimento (mas deve ser removido antes de ir para a produção). Você também aprendeu algumas dicas básicas de otimização, como colocar expressões de filtragem no início do pipeline e não no final.As you've gone through, you'll probably have noticed that there's a ton of different stage types, operators, e accumulator operators. Learning how to use the different components of aggregation pipelines is a big part of learning to use MongoDB effectively as a developer.
Adoro trabalhar com pipelines de agregação e sempre me surpreendo com o que dá para fazer com eles!
Os pipelines de agregação são muito eficazes e, por isso, são um grande tema a ser abordado. Confira a documentação completa para ter uma ideia melhor de todo o seu escopo.
Observe que os pipelines de agregação também podem ser usados para gerar novos dados e gravá-los de volta em uma coleção, com o estágio $out.
MongoDB provides a free GUI tool called Compass. It allows you to connect to your MongoDB cluster, so you can browse through databases and analyze the structure and contents of your collections. It includes an aggregation pipeline builder which makes it easier to build aggregation pipelines. I highly recommend you install it, or if you're using MongoDB Atlas, use its similar aggregation pipeline builder in your browser. I often use them to build aggregation pipelines, and they include export buttons which will export your pipeline as Python code (which isn't too hard to transform into Rust).
I don't know about you, but when I was looking at some of the results above, I thought to myself, "It would be fun to visualise this with a chart." MongoDB provides a hosted service called Charts which just happens to take aggregation pipelines as input. So, now's a good time to give it a try!