Introdução a pipelines de agregação em Python
Mark Smith14 min read • Published Feb 05, 2022 • Updated Oct 01, 2024
Avalie esse Início rápido
Os pipelines de agregação do MongoDB são um de seus recursos mais avançados. Eles permitem escrever expressões, divididas em uma série de estágios, que executam operações, incluindo agregação, transformações e junções nos dados em seus MongoDB databases. Isso permite fazer cálculos e análises entre documentos e coleções dentro do MongoDB database.
Este início rápido é o segundo de uma série de postagens sobre Python. É altamente recomendável que você comece com minha primeira postagem, Operações básicas do MongoDB no Python, que mostrará como configurar corretamente um cluster gratuito de banco de dados MongoDB Atlas contendo os dados de amostra com os quais você trabalhará aqui. Vá ler e volte. Vou aguardar. Sem ele, você não terá o banco de dados configurado corretamente para executar o código neste guia de início rápido.
Em resumo, você vai precisar de:
- Uma versão atualizada do Python 3. Escrevi o código neste tutorial no Python 3.8, mas deve funcionar bem na versão 3.6+.
- Um cluster do MongoDB contendo o conjunto de dados
sample_mflix
. Você pode encontrar instruções para configurar isso na primeira publicação no blog desta série.
Os pipelines de agregação do MongoDB são muito poderosos e, portanto, podem parecer um pouco complexos no início. Por esse motivo, começarei devagar. Primeiro, mostrarei como criar um pipeline que duplica o comportamento que você já pode obter com queries MQL, usando o método
find()
do PyMongo, mas em vez disso usando um pipeline de agregação com os estágios $match
, $sort
e $limit
. Em seguida, mostrarei como fazer queries que vão além da MQL, demonstrando o uso do $lookup
para incluir documentos relacionados de outra coleção. Por fim, colocarei a "agregação" no "pipeline de agregação" mostrando como usar $group
para agrupar documentos e formar novos resumos de documentos.Todo o código de amostra desta série de início rápido pode ser encontrado no GitHub. Recomendamos que você confira se tiver dúvidas, caso contrário, vale a pena seguir o tutorial e escrever o código você mesmo!
Todos os pipelines nesta publicação serão executados na coleção sample_mflix do banco de dados
movies
. Ele contém documentos que se parecem com isto:1 { 2 '_id': ObjectId('573a1392f29313caabcdb497'), 3 'awards': {'nominations': 7, 4 'text': 'Won 1 Oscar. Another 2 wins & 7 nominations.', 5 'wins': 3}, 6 'cast': ['Janet Gaynor', 'Fredric March', 'Adolphe Menjou', 'May Robson'], 7 'countries': ['USA'], 8 'directors': ['William A. Wellman', 'Jack Conway'], 9 'fullplot': 'Esther Blodgett is just another starry-eyed farm kid trying to ' 10 'break into the movies. Waitressing at a Hollywood party, she ' 11 'catches the eye of alcoholic star Norman Maine, is given a test, ' 12 'and is caught up in the Hollywood glamor machine (ruthlessly ' 13 'satirized). She and her idol Norman marry; but his career ' 14 'abruptly dwindles to nothing', 15 'genres': ['Drama'], 16 'imdb': {'id': 29606, 'rating': 7.7, 'votes': 5005}, 17 'languages': ['English'], 18 'lastupdated': '2015-09-01 00:55:54.333000000', 19 'plot': 'A young woman comes to Hollywood with dreams of stardom, but ' 20 'achieves them only with the help of an alcoholic leading man whose ' 21 'best days are behind him.', 22 'poster': 'https://m.media-amazon.com/images/M/MV5BMmE5ODI0NzMtYjc5Yy00MzMzLTk5OTQtN2Q3MzgwOTllMTY3XkEyXkFqcGdeQXVyNjc0MzMzNjA@._V1_SY1000_SX677_AL_.jpg', 23 'rated': 'NOT RATED', 24 'released': datetime.datetime(1937, 4, 27, 0, 0), 25 'runtime': 111, 26 'title': 'A Star Is Born', 27 'tomatoes': {'critic': {'meter': 100, 'numReviews': 11, 'rating': 7.4}, 28 'dvd': datetime.datetime(2004, 11, 16, 0, 0), 29 'fresh': 11, 30 'lastUpdated': datetime.datetime(2015, 8, 26, 18, 58, 34), 31 'production': 'Image Entertainment Inc.', 32 'rotten': 0, 33 'viewer': {'meter': 79, 'numReviews': 2526, 'rating': 3.6}, 34 'website': 'http://www.vcientertainment.com/Film-Categories?product_id=73'}, 35 'type': 'movie', 36 'writers': ['Dorothy Parker (screen play)', 37 'Alan Campbell (screen play)', 38 'Robert Carson (screen play)', 39 'William A. Wellman (from a story by)', 40 'Robert Carson (from a story by)'], 41 'year': 1937}
Há muitos dados lá, mas vou me concentrar principalmente nos campos
_id
, title
, year
e cast
. O primeiro argumento para
aggregate()
é uma sequência de estágios de pipeline a serem executados. Assim como uma query, cada estágio de um pipeline de agregação é um documento BSON, e o PyMongo converte automaticamente um dict
em um documento BSON para você.Um pipeline de agregação opera em todos os dados de uma coleção. Cada estágio do pipeline é aplicado aos documentos que passam, e quaisquer documentos emitidos de um estágio são passados como entrada para o próximo, até que não haja mais estágios restantes. Nesse ponto, os documentos emitidos do último estágio do pipeline são retornados ao programa do cliente, de forma semelhante a uma chamada para
find()
.Etapas individuais, como
$match
, podem atuar como um filtro, para passar apenas por documentos que correspondam a determinados critérios. Outros tipos de estágio, como $project
,$addFields
e $lookup
modificam o conteúdo de documentos individuais à medida que passam pelo pipeline. Finalmente, certos tipos de estágios, como $group
, criam um conjunto inteiramente novo de documentos com base nos documentos que foram passados para ele como um todo. Nenhum desses estágios altera os dados armazenados no próprio MongoDB. Eles apenas alteram os dados antes de devolvê-los ao seu programa! Há um estágio, $set, que pode salvar os resultados de um pipeline de volta ao MongoDB, mas não vou abordá-lo neste início rápido.Estou supondo que você trabalhe no mesmo ambiente usado na última publicação, então você já deve ter o PyMongo e o python-dotenv instalados, e deve ter um arquivo
.env
contendo sua variável de ambiente MONGODB_URI
.Primeiro, cole o seguinte em seu código Python:
1 import os 2 from pprint import pprint 3 4 import bson 5 from dotenv import load_dotenv 6 import pymongo 7 8 # Load config from a .env file: 9 load_dotenv(verbose=True) 10 MONGODB_URI = os.environ["MONGODB_URI"] 11 12 # Connect to your MongoDB cluster: 13 client = pymongo.MongoClient(MONGODB_URI) 14 15 # Get a reference to the "sample_mflix" database: 16 db = client["sample_mflix"] 17 18 # Get a reference to the "movies" collection: 19 movie_collection = db["movies"]
O código acima fornece uma variável global, um objeto de coleção chamado
movie_collection
, que aponta para a coleção movies
em seu banco de dados.Aqui está um código que cria um pipeline, executa-o com
aggregate
e, em seguida, faz um loop e imprime os detalhes de cada filme nos resultados. Cole-o em seu programa.1 pipeline = [ 2 { 3 "$match": { 4 "title": "A Star Is Born" 5 } 6 }, 7 { 8 "$sort": { 9 "year": pymongo.ASCENDING 10 } 11 }, 12 ] 13 results = movie_collection.aggregate(pipeline) 14 for movie in results: 15 print(" * {title}, {first_castmember}, {year}".format( 16 title=movie["title"], 17 first_castmember=movie["cast"][0], 18 year=movie["year"], 19 ))
Este pipeline tem dois estágios. O primeiro é um estágio $match, que é semelhante a fazer query de uma coleção com
find()
. Ele filtra os documentos que passam pelo estágio com base em uma query MQL. Por ser o primeiro estágio do pipeline, sua entrada são todos os documentos da coleção movie
. A query MQL para o estágio $match
é filtrada no campo title
dos documentos de entrada, portanto, os únicos documentos gerados desse estágio terão o título de "A Star Is Born".O segundo estágio é um estágio $sort. Somente os documentos do filme "Nasce uma Estrela" são passados para esse estágio, de modo que o resultado será todos os filmes chamados "Nasce uma Estrela", agora classificados pelo campo de ano, com o filme mais antigo primeiro.
As chamadas para aggregate() retornam um cursor apontando para os documentos resultantes. Pode-se fazer um loop do cursor como qualquer outra sequência. O código acima percorre todos os documentos retornados e imprime um pequeno resumo, consistindo no título, no primeiro ator na array
cast
e no ano em que o filme foi produzido.A execução do código acima resulta em:
1 * A Star Is Born, Janet Gaynor, 1937 2 * A Star Is Born, Judy Garland, 1954 3 * A Star Is Born, Barbra Streisand, 1976
É possível criar pipelines de agregação inteiros como uma única estrutura de dados, como no exemplo acima, mas não é necessariamente uma boa ideia. Os pipelines podem ficar longos e complexos. Por esse motivo, recomendo que você crie cada estágio do seu pipeline como uma variável separada e, em seguida, combine os estágios em um pipeline no final, assim:
1 # Match title = "A Star Is Born": 2 stage_match_title = { 3 "$match": { 4 "title": "A Star Is Born" 5 } 6 } 7 8 # Sort by year, ascending: 9 stage_sort_year_ascending = { 10 "$sort": { "year": pymongo.ASCENDING } 11 } 12 13 # Now the pipeline is easier to read: 14 pipeline = [ 15 stage_match_title, 16 stage_sort_year_ascending, 17 ]
Imagine que eu quisesse obter a produção mais recente de "A Star Is Born" da coleção de filmes.
Isso pode ser pensado como três etapas, executadas em ordem:
- Obtenha os documentos do filme "A Star Is Born".
- Classificado por ano, ordem decrescente.
- Descarte todos, exceto o primeiro documento.
O primeiro estágio já é o mesmo que
stage_match_title
acima. O segundo estágio é o mesmo que stage_sort_year_ascending
, mas com pymongo.ASCENDING
alterado para pymongo.DESCENDING
. O terceiro estágio é um estágio $limit.O código modificado e novo fica assim:
1 # Sort by year, descending: 2 stage_sort_year_descending = { 3 "$sort": { "year": pymongo.DESCENDING } 4 } 5 6 # Limit to 1 document: 7 stage_limit_1 = { "$limit": 1 } 8 9 pipeline = [ 10 stage_match_title, 11 stage_sort_year_descending, 12 stage_limit_1, 13 ]
Se você fizer as alterações acima e executar seu código, verá apenas a seguinte linha:
1 * A Star Is Born, Barbra Streisand, 1976
Espere um pouco! Por que não há um documento para a maravilhosa produção com Lady Gaga e Bradley Cooper?
Espere um pouco! Você encontrará a resposta para esse mistério, e muito mais, mais adiante nesta postagem do blog.
Agora você já sabe filtrar, classificar e limitar o conteúdo de uma coleção usando um pipeline de agregação. Mas essas são apenas operações que você já pode fazer com
find()
! Por que você usaria esses pipelines de agregação complexos e novos?Continue lendo, meu caro, e mostrarei o verdadeiro poder dos pipelines de agregação do MongoDB.
Há um segredo sujo escondido no banco de dados
sample_mflix
. Além da coleção movies
, há também uma coleção chamada comments
. Os documentos da coleção comments
têm esta aparência:1 { 2 '_id': ObjectId('5a9427648b0beebeb69579d3'), 3 'movie_id': ObjectId('573a1390f29313caabcd4217'), 4 'date': datetime.datetime(1983, 4, 27, 20, 39, 15), 5 'email': 'cameron_duran@fakegmail.com', 6 'name': 'Cameron Duran', 7 'text': 'Quasi dicta culpa asperiores quaerat perferendis neque. Est animi ' 8 'pariatur impedit itaque exercitationem.'}
É um comentário sobre um filme. Não sei por que as pessoas estão escrevendo comentários em latim para esses filmes, mas vamos em frente. O segundo campo,
movie_id,
, corresponde ao valor _id
de um documento na coleção movies
.Então, é um comentário relacionado a um filme!
O MongoDB permite que você consulte filmes e incorpore os comentários relacionados, como um JOIN em um banco de dados relacional? Sim! Com o estágio $lookup.
Mostrarei como obter documentos relacionados de outra coleção e incorporá-los nos documento da sua coleção principal. Primeiro, crie um novo pipeline do zero e comece com o seguinte:
1 # Look up related documents in the 'comments' collection: 2 stage_lookup_comments = { 3 "$lookup": { 4 "from": "comments", 5 "localField": "_id", 6 "foreignField": "movie_id", 7 "as": "related_comments", 8 } 9 } 10 11 # Limit to the first 5 documents: 12 stage_limit_5 = { "$limit": 5 } 13 14 pipeline = [ 15 stage_lookup_comments, 16 stage_limit_5, 17 ] 18 19 results = movie_collection.aggregate(pipeline) 20 for movie in results: 21 pprint(movie)
O estágio que chamei de
stage_lookup_comments
é um estágio $lookup
. Esse estágio $lookup
pesquisa documentos da coleçãocomments
que tenham o mesmo ID de filme. Os comentários correspondentes serão listados como uma array em um campo chamado "related_comments", com um valor de array contendo todos os comentários que têm o valor "_id" desse filme como "movie_id".Adicionei um estágio
$limit
apenas para garantir que haja uma quantidade razoável de saída sem ser excessiva.Agora, execute o código.
Você pode notar que o pipeline acima funciona bem devagar! Há duas razões para isso:
- Há 23,5k documentos de filmes e 50k comentários.
- Há um índice ausente na coleção
comments
. Ele não está lá de propósito para você aprender sobre índices!
Eu não mostrarei como resolver o problema do índice agora. Escreverei sobre isso em uma publicação posterior desta série, com foco em índices. Em vez disso, mostrarei um truque para trabalhar com pipelines de agregação lentos enquanto você está desenvolvendo.
Trabalhar com pipelines lentos é cansativo enquanto você escreve e testa o pipeline. Mas, se você colocar um estágio temporário
$limit
no início do seu pipeline, ele tornará a query mais rápida (embora os resultados possam ser diferentes porque você não está executando em todo o conjunto de dados).Quando estava escrevendo esse pipeline, eu tinha um primeiro estágio de
{ "$limit": 1000 }
.Quando terminar de criar o pipeline, você pode comentar a primeira etapa para que o pipeline agora seja executado em toda a coleção. Não se esqueça de remover o primeiro estágio, senão você obterá resultados errados!
O pipeline de agregação acima imprime todo o conteúdo de cinco documentos de filmes. São muitos dados, mas se você olhar com cuidado, verá que há um novo campo em cada documento que se parece com isto:
1 'related_comments': []
Se você tiver sorte, poderá ter alguns documentos na array, mas é improvável, pois a maioria dos filmes não tem comentários. Agora, mostrarei como adicionar alguns estágios para corresponder apenas aos filmes que têm mais de dois comentários.
O ideal é que você pudesse adicionar um único estágio
$match
que tivesse o comprimento do campo related_comments
e o comparasse com a expressão { "$gt": 2 }
. Neste caso, são na verdade duas etapas:- Adicione um campo (que chamarei de
comment_count
) contendo o comprimento do camporelated_comments
. - Correspondência onde o valor de
comment_count
é maior que dois.
Este é o código para os dois estágios:
1 # Calculate the number of comments for each movie: 2 stage_add_comment_count = { 3 "$addFields": { 4 "comment_count": { 5 "$size": "$related_comments" 6 } 7 } 8 } 9 10 # Match movie documents with more than 2 comments: 11 stage_match_with_comments = { 12 "$match": { 13 "comment_count": { 14 "$gt": 2 15 } 16 } 17 }
Os dois estágios ocorrem após o estágio
$lookup
e antes do $limit
5:1 pipeline = [ 2 stage_lookup_comments, 3 stage_add_comment_count, 4 stage_match_with_comments, 5 limit_5, 6 ]
Enquanto estiver aqui, vou limpar a saída desse código, em vez de usar
pprint
:1 results = movie_collection.aggregate(pipeline) 2 for movie in results: 3 print(movie["title"]) 4 print("Comment count:", movie["comment_count"]) 5 6 # Loop through the first 5 comments and print the name and text: 7 for comment in movie["related_comments"][:5]: 8 print(" * {name}: {text}".format( 9 name=comment["name"], 10 text=comment["text"]))
Agora, ao executar esse código, você deve ver algo mais ou menos assim:
1 Footsteps in the Fog 2 -------------------- 3 Comment count: 3 4 * Sansa Stark: Error ex culpa dignissimos assumenda voluptates vel. Qui inventore quae quod facere veniam quaerat quibusdam. Accusamus ab deleniti placeat non. 5 * Theon Greyjoy: Animi dolor minima culpa sequi voluptate. Possimus necessitatibus voluptatem hic cum numquam voluptates. 6 * Donna Smith: Et esse nulla ducimus tempore aliquid. Suscipit iste dignissimos voluptate velit. Laboriosam sequi quae fugiat similique alias. Corporis cumque labore veniam dignissimos.
É bom ver que Sansa Stark, de Game of Thrones, realmente sabe tudo de latim, não é?
Agora que mostrei como trabalhar com pesquisas em seus pipelines, mostrarei como usar o estágio
$group
para fazer a agregação real.Começarei com um novo pipeline mais uma vez.
O estágio
$group
é um dos estágios mais difíceis de entender, portanto, vou explicá-lo em detalhes.Comece com o seguinte código:
1 # Group movies by year, producing 'year-summary' documents that look like: 2 # { 3 # '_id': 1917, 4 # } 5 stage_group_year = { 6 "$group": { 7 "_id": "$year", 8 } 9 } 10 11 pipeline = [ 12 stage_group_year, 13 ] 14 results = movie_collection.aggregate(pipeline) 15 16 # Loop through the 'year-summary' documents: 17 for year_summary in results: 18 pprint(year_summary)
Execute este código e você deverá ver algo assim:
1 {'_id': 1978} 2 {'_id': 1996} 3 {'_id': 1931} 4 {'_id': '2000è'} 5 {'_id': 1960} 6 {'_id': 1972} 7 {'_id': 1943} 8 {'_id': '1997è'} 9 {'_id': 2010} 10 {'_id': 2004} 11 {'_id': 1947} 12 {'_id': '1987è'} 13 {'_id': 1954} 14 ...
Cada linha é um documento emitido pelo pipeline de agregação. Mas você não está mais olhando para documentos de filmes. O estágio
$group
agrupa os documentos de entrada de acordo com a expressão _id
especificada e emite um documento para cada valor _id
exclusivo. Nesse caso, a expressão é $year
, o que significa que um documento será emitido para cada valor exclusivo do campo year
. Cada documento emitido pode (e geralmente conterá) também valores gerados pela agregação de dados dos documentos agrupados. Altere a definição de estágio para o seguinte:
1 stage_group_year = { 2 "$group": { 3 "_id": "$year", 4 # Count the number of movies in the group: 5 "movie_count": { "$sum": 1 }, 6 } 7 }
Isso adicionará um campo
movie_count
, que contém o resultado da adição de 1
para cada documento no grupo. Em outras palavras, ele conta o número de documentos de filmes no grupo. Se você executar o código agora, deverá ver algo assim:1 {'_id': '1997è', 'movie_count': 2} 2 {'_id': 2010, 'movie_count': 970} 3 {'_id': 1947, 'movie_count': 38} 4 {'_id': '1987è', 'movie_count': 1} 5 {'_id': 2012, 'movie_count': 1109} 6 {'_id': 1954, 'movie_count': 64} 7 ...
Há vários operadores acumuladores, como
$sum
, que permitem resumir dados do grupo. Se você quiser criar uma array de todos os títulos de filmes no documento emitido, poderá adicionar "movie_titles": { "$push": "$title" },
ao estágio $group
. Nesse caso, você obteria documentos assim:1 { 2 '_id': 1917, 3 'movie_count': 3, 4 'movie_titles': [ 5 'The Poor Little Rich Girl', 6 'Wild and Woolly', 7 'The Immigrant' 8 ] 9 }
Algo que você provavelmente notou na saída acima é que alguns dos anos contêm o caractere "è". Esse banco de dados tem alguns valores confusos. Nesse caso, há apenas um pequeno número de documentos, e acho que devemos simplesmente removê-los. Adicione os dois estágios a seguir para corresponder apenas aos documentos com um valor numérico
year
e para classificar os resultados:1 stage_match_years = { 2 "$match": { 3 "year": { 4 "$type": "number", 5 } 6 } 7 } 8 9 stage_sort_year_ascending = { 10 "$sort": {"_id": pymongo.ASCENDING} 11 } 12 13 pipeline = [ 14 stage_match_years, # Match numeric years 15 stage_group_year, 16 stage_sort_year_ascending, # Sort by year 17 ]
Note que o estágio
$match
é adicionado ao início do pipeline e o $sort
é adicionado ao final. Uma regra geral é que você deve filtrar documentos no início do pipeline, para que os estágios posteriores tenham menos documentos para lidar. Isso garante que o pipeline tenha mais chances de aproveitar quaisquer índices apropriados atribuídos à coleção.Lembre-se de que todo o código de amostra desta série de início rápido pode ser encontrado no GitHub.
As agregações usando
$group
são uma ótima maneira de descobrir fatos interessantes sobre seus dados. Neste exemplo, estou ilustrando o número de filmes feitos a cada ano, mas também seria interessante ver informações sobre os filmes de cada país ou mesmo ver os filmes feitos por atores diferentes.Você aprendeu a construir pipelines de agregação para filtrar, agrupar e unir documentos a outras coleções. Esperamos que você tenha aprendido que colocar um estágio
$limit
no início do pipeline pode ser útil para acelerar o desenvolvimento (mas deve ser removido antes de ir para a produção). Você também aprendeu algumas dicas básicas de otimização, como colocar expressões de filtragem no início do pipeline e não no final.Você deve ter notado que há uma tonelada de diferentes tipos de estágio, operadores e operadores de acumuladores. Aprender a usar os diferentes componentes dos pipelines de agregação é uma parte importante do aprendizado para usar o MongoDB efetivamente como desenvolvedor.
Adoro trabalhar com pipelines de agregação e sempre me surpreendo com o que dá para fazer com eles!
Os pipelines de agregação são superpoderosos e, por isso, são um grande tópico a ser abordado. Confira a documentação completa para ter uma ideia melhor de todo o seu escopo.
Observe que os pipelines de agregação também podem ser usados para gerar novos dados e gravá-los de volta em uma coleção, com o estágio $out.
O MongoDB fornece uma ferramenta de GUI gratuita chamada Compass. Ela permite que você se conecte ao MongoDB cluster para que possa navegar pelos banco de dados e analisar a estrutura e o conteúdo de suas coleções. Ela inclui um construtor de pipeline de agregação que facilita a criação de pipelines de agregação. É altamente recomendável instalá-lo ou, se estiver usando o MongoDB Atlas, use seu construtor de pipeline de agregação semelhante em seu navegador. Costumo usá-los para criar pipelines de agregação e eles incluem botões de exportação que exportarão seu pipeline como código Python.
Não sei você, mas quando vi alguns dos resultados acima, pensei que seria legal ver isso com um gráfico." O MongoDB fornece um serviço hospedado chamado Charts que por acaso aceita pipelines de agregação como entrada. Então, agora é uma boa hora para tentar!
Considero os pipelines de agregação uma das duas principais ferramentas do MongoDB, junto com Change Streams. Se você quiser saber mais sobre change streams, confira esta publicação no blog da minha talentosa colega, Naomi Pentrel.