Analise dados de séries temporais com Python e MongoDB usando PyMongoArrow e Pandas
Avalie esse Tutorial
No mundo atual centrado em dados, os dados de séries temporais se tornaram indispensáveis para impulsionar as principais decisões organizacionais, análises de tendências e previsões. Esse tipo de dado está em toda parte, desde mercados de ações e sensores de IoT até análises de comportamento do usuário. Mas, à medida que esses conjuntos de dados crescem em volume e complexidade, também aumenta o desafio de armazená-los e analisá-los com eficiência. Seja você um desenvolvedor de IoT ou um analista de dados que lida com informações urgentes, o MongoDB oferece um ecossistema robusto feito sob medida para atender às suas necessidades de armazenamento e análise de dados complexos de séries temporais.
O MongoDB tem suporte embutido para armazenar dados de séries temporais em um tipo especial de coleção chamado coleção de séries temporais. As coleções de séries temporais são diferentes das coleções normais. As coleções de séries temporais usam um formato de armazenamento colunar subjacente e armazenam dados em ordem de tempo com um índice clusterizadocriado automaticamente. O formato de armazenamento em colunas oferece os seguintes benefícios:
- Menos complexidade: o formato em colunas é personalizado para dados de série temporal, facilitando o gerenciamento e a consulta.
- Eficiência da query: o MongoDB cria automaticamente um índice interno agrupado no campo de tempo que melhora o desempenho da query.
- Uso do disco: essa abordagem de armazenamento usa o espaço em disco com mais eficiência em comparação com as coleções tradicionais.
- Otimização de E/S: as operações de leitura exigem menos operações de entrada/saída, melhorando o desempenho geral do sistema.
- Uso do cache: O design permite uma melhor utilização do cache do WiredTiger, melhorando ainda mais o desempenho da query.
Neste tutorial, criaremos uma coleção de séries temporais e, em seguida, armazenaremos alguns dados de séries temporais nela. Veremos como você pode consultá-los no MongoDB e como você pode ler esses dados no pandas DataFrame, executar algumas análises e gravar os dados modificados de volta no MongoDB. Este tutorial foi projetado para ser um detalhamento completo do trabalho com dados de séries temporais no MongoDB.
Usaremos as seguintes ferramentas/frameworks:
- Banco de dados MongoDB Atlas, para armazenar nossos dados de série temporal. Se você ainda não tiver um Atlas cluster criado, vá em frente e crie um, configure um usuário e adicione seu endereço IP de conexão à sua lista de acesso IP.
- Driver PyMongo(para se conectar ao seu banco de dados MongoDB Atlas, consulte as instruçõesde instalação ).
Observação: antes de executar qualquer código ou instalar qualquer pacote do Python, é altamente recomendável configurar um ambiente Python separado. Isso ajuda a isolar dependências, gerenciar pacotes e evitar conflitos que possam surgir de diferentes versões de pacotes. Criar um ambiente é uma etapa opcional, mas altamente recomendada.
Neste ponto, estamos assumindo que você tem um Atlas cluster criado e pronto para ser usado, e PyMongo e Jupyter Notebook instalados. Go em frente e iniciar o Jupyter Notebook executando o seguinte comando no terminal:
1 Jupyter Notebook
Depois que o Jupyter Notebook estiver instalado e em execução, vamos buscar a connection string do MongoDB Atlas cluster e armazená-la como uma variável de ambiente, que usaremos mais tarde para nos conectar ao nosso banco de dados. Depois de fazer isso, vamos nos conectar ao nosso Atlas cluster executando os seguintes comandos:
1 import pymongo 2 import os 3 4 from pymongo import MongoClient 5 6 MONGO_CONN_STRING = os.environ.get("MONGODB_CONNECTION_STRING") 7 8 client = MongoClient(MONGO_CONN_STRING)
Em seguida, vamos criar um novo banco de dados e uma coleção em nosso cluster para armazenar os dados de séries temporais. Chamaremos esse banco de dados de "stock_data " e a collection de "stocks ".
1 # Let's create a new database called "stock data" 2 db = client.stock_data 3 4 # Let's create a new time-series collection in the "stock data" database called "stocks" 5 6 collection = db.create_collection('stocks', timeseries={ 7 8 timeField: "timestamp", 9 metaField: "metadata", 10 granularity: "hours" 11 12 })
Aqui, usamos o método db.create_collection() para criar uma time-series collection chamada "stock ". No exemplo acima, "timeField ", "metaField " e "granularity " são campos reservados (para obter mais informações sobre o que são, acesse nossa documentação). A opção “timeField” especifica o nome do campo em sua collection que conterá a data em cada documento de série temporal.
A opção “metaField” especifica o nome do campo em sua coleção que conterá os metadados em cada documento de série temporal.
Por fim, a opção “granularity” especifica com que frequência os dados serão ingeridos em sua coleta de séries temporais.
Agora, vamos inserir algumas informações relacionadas a ações em nossa coleção. Estamos interessados em armazenar e analisar as ações de uma empresa específica chamada "XYZ", que negocia suas ações em "NASDAQ".
Estamos armazenando algumas métricas de preço dessa ação em um intervalo de hora em hora e, para cada intervalo de tempo, armazenamos as seguintes informações:
- Abertura: o preço de abertura pelo qual a ação foi negociada quando o mercado abriu
- Fechamento: o preço final pelo qual a ação foi negociada quando o período de negociação terminou
- Alta: o preço mais alto pelo qual a ação foi negociada durante o período de negociação
- Baixo: o preço mais baixo pelo qual a ação foi negociada durante o período de negociação
- Volume: o número total de ações negociadas durante o período de negociação
Agora que nos tornamos especialistas em negociação de ações e terminologia (sarcasmo), inseriremos alguns documentos em nossa coleção de séries temporais. Aqui temos quatro documentos de amostra. Os pontos de dados são capturados em um intervalo de uma hora.
1 # Create some sample data 2 3 data = [ 4 { 5 "metadata": { 6 "stockSymbol": "ABC", 7 "exchange": "NASDAQ" 8 }, 9 "timestamp": datetime(2023, 9, 12, 15, 19, 48), 10 "open": 54.80, 11 "high": 59.20, 12 "low": 52.60, 13 "close": 53.50, 14 "volume": 18000 15 }, 16 17 { 18 "metadata": { 19 "stockSymbol": "ABC", 20 "exchange": "NASDAQ" 21 }, 22 "timestamp": datetime(2023, 9, 12, 16, 19, 48), 23 "open": 51.00, 24 "high": 54.30, 25 "low": 50.50, 26 "close": 51.80, 27 "volume": 12000 28 }, 29 30 { 31 "metadata": { 32 "stockSymbol": "ABC", 33 "exchange": "NASDAQ" 34 }, 35 "timestamp":datetime(2023, 9, 12, 17, 19, 48), 36 "open": 52.00, 37 "high": 53.10, 38 "low": 50.50, 39 "close": 52.90, 40 "volume": 10000 41 }, 42 43 { 44 "metadata": { 45 "stockSymbol": "ABC", 46 "exchange": "NASDAQ" 47 }, 48 "timestamp":datetime(2023, 9, 12, 18, 19, 48), 49 "open": 52.80, 50 "high": 60.20, 51 "low": 52.60, 52 "close": 55.50, 53 "volume": 30000 54 } 55 ] 56 57 # insert the data into our collection 58 59 collection.insert_many(data)
Agora, vamos executar uma consulta de localização em nossa collection para recuperar dados em um registro de data/hora específico. Execute esta query no Jupyter Notebook após o script anterior.
1 collection.find_one({'timestamp': datetime(2023, 9, 12, 15, 19, 48)})
/SAÍDA
Como você pode ver na saída, conseguimos consultar nossa coleção de séries temporais e recuperar pontos de dados em um carimbo de data/hora específico.
Da mesma forma, você pode executar consultas mais poderosas em sua coleção de séries temporais usando o pipeline de agregação. Para o escopo deste tutorial, não abordaremos isso. Mas, se você quiser saber mais sobre isso, aqui é onde você pode ir:
Agora, vamos ver como você pode mover seus dados de séries temporais para o DataFrame do pandas para executar algumas operações de análise.
O MongoDB criou uma ferramenta apenas para essa finalidade , chamada PyMongoArrow. PyMongoArrow é uma biblioteca Python que permite mover dados para dentro e para fora do MongoDB para outros formatos de dados, como Pandas DataFrame, array Numpy e Arrow Table.
Vamos instalar rapidamente o PyMongoArrow usando o comando pip em seu terminal. Estamos presumindo que você já tem o Pandas instalado em seu sistema. Caso contrário, você também pode usar o comando pip para instalá-lo.
1 pip install pymongoarrow
Agora, vamos importar todas as bibliotecas necessárias. Vamos usar o mesmo arquivo ou bloco de anotações (Jupyter Notebook) para executar os códigos abaixo.
1 import pymongoarrow 2 import pandas as pd 3 4 # pymongoarrow.monkey module provided an interface to patch pymongo, in place, and add pymongoarrow's functionality directly to collection instance. 5 6 from pymongoarrow.monkey import patch_all 7 patch_all() 8 9 # Let's use the pymongoarrow's find_pandas_all() function to read MongoDB query result sets into 10 11 df = collection.find_pandas_all({})
Agora, lemos todos os nossos dados de estoque armazenados na collection "stocks" em um DataFrame pandas 'df'.
Vamos imprimir rapidamente o valor armazenado na variável "df" para verificá-lo.
1 print(df) 2 3 print(type(df))
/SAÍDA
Viva...parabens! Como você pode ver, lemos com sucesso nossos dados do MongoDB no Pandas DataFrame.
Agora, se você é um trader do mercado de ações, estaria interessado em fazer muitas análises sobre esses dados para obter insights significativos. Mas, para este tutorial, vamos apenas calcular a variação percentual horária nos preços de fechamento da ação. Isso nos ajudará a entender os movimentos diários de preços em termos de ganhos ou perdas percentuais.
Adicionaremos uma nova coluna em nosso DataFrame 'df' chamado "daily_pct_change ".
1 df = df.sort_values('timestamp') 2 3 df['daily_pct_change'] = df['close'].pct_change() * 100 4 5 # print the dataframe to see the modified data 6 print(df)
/SAÍDA
Como você pode ver, adicionamos com sucesso uma nova coluna ao nosso DataFrame.
Agora, gostaria de persistir os dados DataFrame modificados em um banco de dados para que possamos executar mais análises mais tarde. Então, vamos escrever esses dados de volta no MongoDB usando a função de escrita do PyMongoArrow.
Vamos apenas criar uma nova coleção chamada “my_new_collection” em nosso banco de dados para gravar o DataFrame modificado de volta no MongoDB, garantindo a persistência dos dados.
1 from pymongoarrow.api import write 2 3 coll = db.my_new_collection 4 5 # write data from pandas into MongoDB collection called 'coll' 6 write(coll, df) 7 8 # Now, let's verify that the modified data has been written into our collection 9 10 print(coll.find_one({}))
Parabéns por concluir este tutorial com sucesso.
Neste tutorial, abordamos como trabalhar com dados de série temporal usando MongoDB e Python. Aprendemos como armazenar dados do mercado de ações em uma coleção de séries temporais do MongoDB e, em seguida, como realizar análises simples usando um DataFrame do pandas. Também exploramos como o PyMongoArrow facilita a movimentação de dados entre MongoDB e Pandas. Finalmente, salvamos nossos dados analisados de volta no MongoDB. Este guia fornece uma maneira direta de gerenciar, analisar e armazenar dados de séries temporais. Ótimo trabalho se você acompanhou - agora você está pronto para lidar com dados de séries temporais em seus próprios projetos.
Se você quiser saber mais sobre o PyMongoArrow, confira alguns desses recursos adicionais: