PyMongoArrow: fazendo a ponte entre o MongoDB e seu aplicativo de análise de dados
Avalie esse Início rápido
O MongoDB sempre foi um ótimo banco de dados para ciência e análise de dados, e isso ocorre porque você pode:
- Importe dados sem um esquema fixo.
- Limpe-o no banco de dados.
- Ouça em tempo real para atualizações (um recurso muito útil que é usado pelo nosso MongoDB Kafka Connector).
Mas o MongoDB é um banco de dados de uso geral, e não uma ferramenta de análise de dados, portanto, um padrão comum ao analisar dados armazenados no MongoDB é extrair os resultados de uma consulta em uma matriz Numpy ou dataframe Pandas e executar análises complexas e potencialmente longas usando o kit de ferramentas que essas estruturas fornecem. Até recentemente, o impacto no desempenho da conversão de grandes quantidades de dados BSON, conforme fornecido pelo MongoDB nessas estruturas de dados, era mais lento do que gostaria.
Felizmente, o MongoDB lançou recentemente o PyMongoArrow, uma biblioteca Python para converter com eficiência o resultado de uma query do MongoDB no modelo de dadosApache Arrow . Se você não está ciente do Arrow, agora pode estar lendo: "Mark, como a conversão para o Apache Arrow me ajuda com minha análise do Numpy ou do Pandas?" A resposta é: a conversão entre Arrow, Numpy e Pandas é supereficiente, então fornece um formato intermediário útil para seus dados tabulares. Dessa forma, nos concentramos em criar uma ferramenta poderosa para mapeamento entre MongoDB e Arrow e aproveitar a bibliotecaPyArrowexistente para integração com Numpy e MongoDB
Você precisará de uma versão recente do Python (estou usando 3.8) com pip disponível. Você pode usar o conda se quiser, mas o PyMongoArrow é lançado no PyPI, então você ainda precisará usar o pip para instalá-lo em seu ambiente do conda Python.
Este tutorial foi escrito para PyMongoArrow v0.1.1.
Neste tutorial, usarei um banco de dados de exemplo que você pode instalar ao criar um cluster hospedado no MongoDB Atlas. O banco de dados que usarei é o banco de dados "sample_wetherdata". Você acessará isso com um URI
mongodb+srv
, então precisará instalar o PyMongo com o extra "srv", assim:1 python -m pip install jupyter pymongoarrow 'pymongo[srv]' pandas
Dica útil: Se você acabou de executar o
pip
, pode acabar usando uma cópia do pip
que foi instalada para uma versão diferente do python
da que você está usando. Por alguma razão, o PATH
ficar confuso dessa forma acontece com mais frequência do que você pensa. Uma solução para isso é executar pip via Python, com o comando python -m pip
. Dessa forma, ele sempre executará a versão do pip
associada à versão do python
em seu PATH
. Esta é agora a maneiraoficialmente recomendada de executar o pip
!Você também precisará de um cluster do MongoDB configurado com os conjuntos de dados de amostra importados. Siga estas instruções para importá-los para seu cluster do MongoDB e, em seguida, defina uma variável de ambiente,
MDB_URI
, apontando para seu banco de dados. Ela deve se parecer com a linha abaixo, mas com o URI que você copiou da interface da Web do Atlas. (Clique no botão "Connect" para seu cluster).1 export MDB_URI=mongodb+srv://USERNAME:PASSWORD@CLUSTERID.azure.mongodb.net/sample_weatherdata?retryWrites=true&w=majority
Um exemplo de documento da coleção "data" é algo parecido com o seguinte:
1 {'_id': ObjectId('5553a998e4b02cf7151190bf'), 2 'st': 'x+49700-055900', 3 'ts': datetime.datetime(1984, 3, 5, 15, 0), 4 'position': {'type': 'Point', 'coordinates': [-55.9, 49.7]}, 5 'elevation': 9999, 6 'callLetters': 'SCGB', 7 'qualityControlProcess': 'V020', 8 'dataSource': '4', 9 'type': 'FM-13', 10 'airTemperature': {'value': -5.1, 'quality': '1'}, 11 'dewPoint': {'value': 999.9, 'quality': '9'}, 12 'pressure': {'value': 1020.8, 'quality': '1'}, 13 'wind': {'direction': {'angle': 100, 'quality': '1'}, 14 'type': 'N', 15 'speed': {'rate': 3.1, 'quality': '1'}}, 16 'visibility': {'distance': {'value': 20000, 'quality': '1'}, 17 'variability': {'value': 'N', 'quality': '9'}}, 18 'skyCondition': {'ceilingHeight': {'value': 22000, 19 'quality': '1', 20 'determination': 'C'}, 21 'cavok': 'N'}, 22 'sections': ['AG1', 'AY1', 'GF1', 'MD1', 'MW1'], 23 'precipitationEstimatedObservation': {'discrepancy': '2', 24 'estimatedWaterDepth': 0}, 25 'pastWeatherObservationManual': [{'atmosphericCondition': {'value': '0', 26 'quality': '1'}, 27 'period': {'value': 3, 'quality': '1'}}], 28 'skyConditionObservation': {'totalCoverage': {'value': '01', 29 'opaque': '99', 30 'quality': '1'}, 31 'lowestCloudCoverage': {'value': '01', 'quality': '1'}, 32 'lowCloudGenus': {'value': '01', 'quality': '1'}, 33 'lowestCloudBaseHeight': {'value': 800, 'quality': '1'}, 34 'midCloudGenus': {'value': '00', 'quality': '1'}, 35 'highCloudGenus': {'value': '00', 'quality': '1'}}, 36 'atmosphericPressureChange': {'tendency': {'code': '8', 'quality': '1'}, 37 'quantity3Hours': {'value': 0.5, 'quality': '1'}, 38 'quantity24Hours': {'value': 99.9, 'quality': '9'}}, 39 'presentWeatherObservationManual': [{'condition': '02', 'quality': '1'}]}
Para simplificar as coisas neste tutorial, ignorarei todos os campos, exceto "ts", "wind" e o campo "_id".
Configurei a variável de ambiente
MDB_URI
, instalei as dependências acima e, em seguida, liguei um novo Python 3 Jupyter Notebook. Coloquei o bloco de anotações no GitHub, se você quiser acompanhar ou executar você mesmo.Adicionei o seguinte código a uma célula na parte superior do arquivo para importar os módulos necessários e conectar-me ao meu banco de dados:
1 import os 2 import pyarrow 3 import pymongo 4 import bson 5 import pymongoarrow.monkey 6 from pymongoarrow.api import Schema 7 8 MDB_URI = os.environ['MDB_URI'] 9 10 11 # Add extra find_* methods to pymongo collection objects: 12 pymongoarrow.monkey.patch_all() 13 14 client = pymongo.MongoClient(MDB_URI) 15 database = client.get_default_database() 16 collection = database.get_collection("data")
Se os dados que você deseja converter em tabelas de dados Arrow, Pandas ou Numpy já estiverem simples — ou seja, os campos estiverem todos no nível superior de seus documentos — você poderá usar os métodos
find\_arrow\_all
, find\_pandas\_all
e find\_numpy\_all
para consultar sua collection e retornar a estrutura de dados apropriada.1 collection.find_pandas_all( 2 {}, 3 schema=Schema({ 4 'ts': pyarrow.timestamp('ms'), 5 }) 6 )
ts | |
---|---|
0 | 1984-03-05 15:00:00 |
1 | 1984-03-05 18:00:00 |
2 | 1984-03-05 18:00:00 |
3 | 1984-03-05 18:00:00 |
4 | 1984-03-05 18:00:00 |
... | ... |
9995 | 1984-03-13 06:00:00 |
9996 | 1984-03-13 06:00:00 |
9997 | 1984-03-13 06:00:00 |
9998 | 1984-03-12 09:00:00 |
9999 | 1984-03-12 12:00:00 |
10000 linhas × 1 colunas
O primeiro argumento para find_pandas_all é o argumento
filter
. Estou interessado em todos os documentos da coleção, portanto, deixei-o vazio. Os documentos na coleção de dados são bastante aninhados, portanto, o único valor real que posso acessar com uma consulta de localização é o carimbo de data/hora de quando os dados foram registrados, o campo "ts". Não se preocupe - mostrarei como acessar o restante dos dados em breve!Como as tabelas Arrow (e os outros tipos de dados) são fortemente tipadas, você também precisará fornecer um esquema para mapear do esquema dinâmico permissivo do MongoDB para os tipos que deseja manipular em sua estrutura de dados na memória.
O
Schema
é um mapeamento do nome do campo, para o tipo apropriado a ser usado por Arrow, Pandas ou Numpy. No momento atual, esses tipos são 64ints de bits, 64números de ponto flutuante de bits e data/hora. A maneira mais fácil de especificá-los é com os tipos de python nativos int
e float
, e com pyarrow.datetime
. Quaisquer campos no documento que não estejam listados no esquema serão ignorados.Atualmente, o PyMongoArrow redireciona o parâmetro
projection
para os métodosfind_*_all
; portanto, não é possível escrever uma projeção para nivelar a estrutura no momento.Os documentos do MongoDB são muito flexíveis e podem suportar arrays e documentos aninhados. Embora o Apache Arrow também ofereça suporte a listas, estruturas e dicionários aninhados, os arrays Numpy e os dataframes do Pandas, por outro lado, são estruturas de dados tabulares ou colunares. Existem planos para oferecer suporte ao mapeamento para os tipos de dados Arrow aninhados no futuro, mas, no momento, apenas valores escalares são suportados com todas as três bibliotecas. Portanto, em todos esses casos, será necessário nivelar os dados que você está exportando de seus documentos.
Para projetar seus documentos em uma estrutura plana, você precisará usar os métodos
aggregate_*_all
mais poderosos que o PyMongoArrow adiciona aos seus objetos PyMongo Collection.Em um pipeline de agregação, você pode adicionar um estágio
$project
à sua consulta para projetar os campos aninhados que deseja na sua tabela para campos de nível superior no resultado da agregação.Para testar meu estágio
$project
, primeiro o executei com a função agregada padrão do PyMongo. Converti-o em um list
para que o Jupyter exibisse os resultados.1 list(collection.aggregate([ 2 {'$match': {'_id': bson.ObjectId("5553a998e4b02cf7151190bf")}}, 3 {'$project': { 4 'windDirection': '$wind.direction.angle', 5 'windSpeed': '$wind.speed.rate', 6 }} 7 ])) 8 9 [{'_id': ObjectId('5553a998e4b02cf7151190bf'), 10 'windDirection': 100, 11 'windSpeed': 3.1}]
Como fiz a correspondência de um único documento por "ID," apenas um documento é retornado, mas você pode ver que o estágio
$project
mapeou $wind.direction.angle
para o campo de nível superior "windDirection" no resultado, e o mesmo com $wind.speed.rate
e "windSpeed" no resultado.Posso pegar esse estágio
$project
e usá-lo para nivelar todos os resultados de uma query de agregação e, em seguida, fornecer um esquema para identificar "windDirection" como um valor inteiro e "windspeed" como um número de ponto flutuante, assim:1 collection.aggregate_pandas_all([ 2 {'$project': { 3 'windDirection': '$wind.direction.angle', 4 'windSpeed': '$wind.speed.rate', 5 }} 6 ], 7 schema=Schema({'windDirection': int, 'windSpeed': float}) 8 )
uma | B | C |
---|---|---|
windDirection | windSpeed | |
0 | 100 | 3.1 |
1 | 50 | 9.0 |
2 | 30 | 7.7 |
3 | 270 | 19.0 |
4 | 50 | 8.2 |
... | ... | ... |
9995 | 10 | 7.0 |
9996 | 60 | 5.7 |
9997 | 330 | 3.0 |
9998 | 140 | 7.7 |
9999 | 80 | 8.2 |
10000 linhas × 2 colunas
Existem apenas 10000 documentos nessa coleção, mas alguns benchmarks básicos que escrevi mostram que isso é cerca 20% mais rápido do que trabalhar diretamente com
DataFrame.from_records
e PyMongo
. Com conjuntos de dados maiores, esperaria que a diferença no desempenho fosse mais significativa. É cedo para a biblioteca PyMongoArrow e, portanto, há algumas limitações no momento, como as que mencionei acima, mas o futuro parece positivo para essa biblioteca ao fornecer mapeamentos rápidos entre suas coleções ricas e flexíveis do MongoDB e qualquer outra coleção -requisitos de análise de memória que você pode ter com Arrow, Pandas ou Numpy.Se você está planejando fazer muitas análises de dados armazenados no MongoDB, verifique se está atualizado sobre os recursos mais recentes da poderosa estrutura de agregaçãodo MongoDB. Você pode fazer muitas coisas no banco de dados, portanto, talvez não seja necessário exportar seus dados. Você pode se conectar a servidores secundários em seu cluster para reduzir a carga no primário para queries de análise, ou até mesmo ter nós de análise dedicados para executar esses tipos de queries. Confira MongoDB 5.0 As novas funções de janela e se você estiver trabalhando com dados de série temporal, com certeza vai querer saber sobre o MongoDB 5.As novascoleções de séries temporais do0 .