Preparando dados de série temporal para ferramentas de análise com $densify e $fill
Avalie esse Tutorial
Os dados de séries temporais referem-se a registros de valores contínuos em pontos específicos no tempo. Esses dados são então examinados, não como pontos de dados individuais, mas como um valor muda ao longo do tempo ou se correlaciona com outros valores no mesmo período de tempo.
Normalmente, os pontos de dados teriam um registro de data e hora, um ou mais valores de metadados para identificar a fonte dos dados e um ou mais valores conhecidos como medições. Por exemplo, um ticker de ações teria hora, símbolo da ação (metadados) e preço (medição), enquanto os dados de rastreamento de aeronaves podem ter hora, número da cauda e vários valores de medição, como velocidade, direção, altitude e taxa de subida. Quando registramos esses dados no MongoDB, também podemos incluir metadados de categoria adicionais para auxiliar na análise. Por exemplo, no caso do rastreamento de voos, podemos armazenar o número da cauda como um identificador exclusivo, mas também o tipo de aeronave e o proprietário como metadados adicionais, o que nos permite analisar os dados com base em categorias mais amplas.
A análise de dados de séries temporais geralmente é para identificar uma correlação anteriormente desconhecida entre pontos de dados ou para tentar prever padrões e, portanto, leituras futuras. Há muitas ferramentas e técnicas, incluindo aprendizado de máquina e análise de Fourier, aplicadas para examinar as alterações no fluxo de dados e prever quais serão as leituras futuras. No mundo das altas finanças, setores e carreiras inteiros foram construídos com base na tentativa de dizer qual será o próximo comportamento do preço das ações.
Algumas dessas técnicas analíticas exigem que os dados estejam em um formato específico, sem leituras ausentes e com períodos de tempo regularmente espaçados, ou ambos, mas os dados nem sempre estão disponíveis nesse formato.
Alguns dados são espaçados regularmente; um processo industrial pode registrar leituras de sensores em intervalos precisos. No entanto, há motivos para a falta de dados. Pode ser falha de software ou por motivos sociais externos. Imagine que estamos examinando o número de clientes nas estações do metrô de Londres. Podemos ter uma contagem diária que usamos para prever o uso no futuro, mas uma greve ou outro evento externo pode fazer com que esse tráfego caia para zero em determinados dias. De uma perspectiva analítica, queremos substituir essa contagem ausente ou contagem de zero por um valor mais típico.
Alguns conjuntos de dados são inerentemente irregulares. Ao medir coisas no mundo real, talvez não seja possível fazer leituras em uma cadência regular porque são eventos discretos que acontecem de forma irregular, como registrar tornados ou neutrinos detectados. Outros dados do mundo real podem ser um evento contínuo, mas só podemos observá-los em momentos aleatórios. Imagine que estamos rastreando um grupo de baleias pelo oceano. Eles estão em algum lugar o tempo todo, mas só podemos gravá-los quando os vemos ou quando um dispositivo de rastreamento está ao alcance de um receptor.
Depois de dar esses exemplos, é mais fácil explicar a funcionalidade real disponível para densificação e preenchimento de lacunas usando um conceito genérico de tempo e leituras em vez de exemplos específicos, portanto, faremos isso a seguir. Esses estágios de agregação funcionam tanto em séries temporais quanto em coleções regulares.
O estágio de aggregation
$densify
adicionado no MongoDB 5.2 permite criar documentos ausentes na série preenchendo um documento onde não se está presente em um conjunto espaçado regularmente ou inserindo documentos em intervalos espaçados regularmente entre os pontos de dados existentes em um conjunto espaçado irregularmente.Imagine que temos um conjunto de dados em que obtemos uma leitura uma vez por minuto, mas, às vezes, estamos faltando leituras. Podemos criar dados como esse no mongosh shell abrangendo os 20 minutos anteriores usando o script a seguir. Isso começa com a criação de um registro com o tempo atual e, em seguida, subtrai 60000 milissegundos dele até que tenhamos 20 documentos. Ele também falha ao inserir qualquer documento em que o iterador divida uniformemente por 7 para criar registros ausentes.
1 db=db.getSiblingDB('tsdemo') 2 db.data.drop() 3 4 let timestamp =new Date() 5 for(let reading=0;reading<20;reading++) { 6 timestamp = new Date(timestamp.getTime() - 60000) 7 if(reading%7) db.data.insertOne({reading,timestamp}) 8 }
Embora possamos visualizá-los como texto usando db.data.find() , é melhor se pudermos visualizá-los. Idealmente, usaremos MongoDB Charts para isso. No entanto, essas funções ainda não estão todas disponíveis para nós no Atlas e Charts com a camada gratuita, então estou usando uma instalação local de pré-lançamento do MongoDB 5.3 e o mongosh shell gravando um gráfico em HTML. Podemos definir uma função de gráfico colando o seguinte código no mongosh ou salvando-o em um arquivo e carregando-o com o comando
load()
no mongosh. Observe que você precisa modificar a palavra abrir no roteiro abaixo de acordo com os comentários para corresponder ao comando que seu sistema operacional usa para abrir um arquivo HTML em um navegador.1 function graphTime(data) 2 { 3 let fs=require("fs") 4 let exec = require('child_process').exec; 5 let content = ` 6 <script src="https://cdn.jsdelivr.net/npm/chart.js/dist/chart.min.js"></script> 7 <script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns/dist/chartjs-adapter-date-fns.bundle.min.js"></script> 8 <canvas id="myChart" style="width:100%"></canvas> 9 <script> 10 var xyreadings = ${JSON.stringify(tsdata)} 11 new Chart("myChart", { 12 type: "scatter", data: { datasets: [{ pointRadius: 4, pointBackgroundColor: "rgba(0,0,255,0.5)", data: xyreadings }] }, 13 options: { scales:{ xAxis: {type: 'time',time:{unit:'minute'}}}}}); 14 </script>` 15 16 try { 17 let rval = fs.writeFileSync('graph.html', content) 18 //Linux use xdg-open not open 19 //Windows use start not open 20 //Mac uses open 21 rval = exec('open graph.html',null); //←---- ADJUST FOR OS 22 } catch (err) { 23 console.error(err) 24 } 25 }
Agora podemos visualizar os dados de amostra que adicionamos executando uma consulta e passando-a para a função.
1 let tsdata = db.data.find({},{_id:0,y:"$reading",x:"$timestamp"}).toArray() 2 3 graphTime(tsdata)
E podemos ver nossos pontos de dados traçados dessa forma
Neste gráfico, as linhas de grade verticais finos mostram minutos e os pontos azul são nossos pontos de dados. Observe que os pontos azuis estão uniformemente espaçados na horizontal, embora não estejam alinhados com os minutos exatos. Uma leitura que é feita a cada minuto não exige que seja feita exatamente em 0 segundos desse minuto. Podemos ver que estamos faltando alguns pontos.
Podemos adicionar esses pontos ao ler os dados usando
$densify
. Embora inicialmente não tenhamos um valor para eles, podemos pelo menos criar documentos de espaço reservado com o registro de data e hora correto.Para fazer isso, lemos os dados usando um pipeline de agregação de dois estágios, conforme abaixo, especificando o campo que precisamos adicionar, a magnitude do tempo entre as leituras e se desejamos aplicá-lo a alguns ou a todos os pontos de dados. Também podemos ter escalas separadas com base em categorias de dados, adicionando pontos de dados ausentes para cada avião ou sensor distinto, por exemplo. No nosso caso, aplicaremos a todos os dados, pois estamos lendo apenas uma métrica em nosso exemplo simples.
1 let densify = { $densify : { field: "timestamp", 2 range: { step: 1, unit: "minute", bounds: "full" }}} 3 4 let projection = {$project: {_id:0, y: {$ifNull:["$reading",0]},x:"$timestamp"}} 5 6 let tsdata = db.data.aggregate([densify,projection]).toArray() 7 8 graphTime(tsdata)
Esse pipeline adiciona novos documentos com o valor necessário de registro de data e hora sempre que um estiver ausente. Ele não adiciona nenhum outro campo a esses documentos, para que eles não apareçam em nosso gráfico. Os documentos criados ficam assim, sem leitura ou campo_id.
1 { 2 timestamp : ISODate("2022-03-23T17:55:32.485Z") 3 }
Para corrigir isso, acompanhei isso com uma projeção que define a leitura como 0 se ela não existir usando
$ifNull
. Isso é chamado de preenchimento zero e fornece saída da mesma forma. Para ser útil, é quase certo que precisamos obter uma estimativa melhor do que zero para essas leituras ausentes — podemos fazer isso usando
$fill
.O estágio de aggregation
$fill
foi adicionado no MongoDB 5.3 e pode substituir leituras nulas ou ausentes em documentos, estimando-as com base nos valores não nulos de cada lado (ignorar nulos permite que ele leve em conta vários valores ausentes em uma linha). Ainda precisamos usar $densify
para adicionar os documentos ausentes em primeiro lugar, mas, quando os tivermos, em vez de adicionar uma leitura zero usando $set
ou $project
, podemos usar $fill
para calcular valores mais significativos.Para usar
$fill
, você precisa ser capaz de classificar os dados de maneira significativa, pois as leituras ausentes serão derivadas das leituras anteriores e posteriores. Em muitos casos, você classificará por tempo, embora outros dados de intervalo possam ser usados.Podemos computar os valores ausentes dessa forma, especificando o campo a ser ordenado, o campo que queremos adicionar se estiver ausente e o método - nesse caso,
locf
, que repete o mesmo valor do ponto de dados anterior.1 let densify = { $densify : { field: "timestamp", 2 range: { step: 1, unit: "minute", bounds : "full" }}} 3 4 let fill = { $fill : { sortBy: { timestamp:1}, 5 output: { reading : { method: "locf"}}}} 6 7 let projection = {$project: {_id:0,y:"$reading" ,x:"$timestamp"}} 8 9 let tsdata = db.data.aggregate([densify,fill,projection]).toArray() 10 11 graphTime(tsdata)
Isso cria um conjunto de valores como este.
No entanto, nesse caso, esses pontos adicionados parecem errados. Simplesmente optar por repetir a leitura anterior não é o ideal aqui. Em vez disso, podemos aplicar uma interpolação linear, desenhando uma linha imaginária entre os pontos antes e depois da lacuna e pegando o ponto em que nosso carimbo de data/hora cruza essa linha. Para isso, alteramos
locf
para linear
em nosso $fill
.1 let densify = { $densify : { field: "timestamp", 2 range : { step: 1, unit: "minute", bounds : "full" }}} 3 4 let fill = { $fill : { sortBy: { timestamp:1}, 5 output: { reading : { method: "linear"}}}} 6 7 let projection = {$project: {_id:0,y:"$reading" ,x:"$timestamp"}} 8 9 let tsdata = db.data.aggregate([densify,fill,projection]).toArray() 10 graphTime(tsdata)
Agora obtemos o seguinte gráfico, que, neste caso, parece muito mais apropriado.
Podemos ver como adicionar valores ausentes em dados regularmente espaçados, mas como convertemos dados irregularmente espaçados em espaçados regularmente, se é isso que nossa análise exige?
Imagine que temos um conjunto de dados em que obtemos uma leitura aproximadamente uma vez por minuto, mas com espaçamento desigual. Às vezes, o tempo entre as leituras é 20 segundos e, às vezes, é de 80 segundos. Em média, é uma vez por minuto, mas o algoritmo que queremos aplicar a ele precisa de dados uniformemente espaçados. Desta vez, criaremos dados aperiódicas como essa na shell do mongosh abrangendo os 20 minutos anteriores, com alguma variação no tempo e uma leitura decrescente constante.
1 db.db.getSiblingDB('tsdemo') 2 3 db.data.drop() 4 5 let timestamp =new Date() 6 let start = timestamp; 7 for(let i=0;i<20;i++) { 8 timestamp = new Date(timestamp.getTime() - Math.random()*60000 - 20000) 9 let reading = (start-timestamp)/60000 10 db.data.insertOne({reading,timestamp}) 11 }
Quando traçamos isso, podemos ver que os pontos não estão mais espaçados uniformemente. No entanto, exigimos dados periódicos para nosso trabalho de análise downstream, então como podemos corrigir isso? Não podemos simplesmente numerar os horários nas leituras existentes. Podemos nem ter um para cada minuto, e os valores seriam imprecisos para o tempo.
1 let tsdata = db.data.find({},{_id:0,y:"$reading",x:"$timestamp"}).toArray() 2 3 graphTime(tsdata)
Podemos resolver isso usando $densify para adicionar os pontos que precisamos, $fill para calcular seus valores com base no valor mais próximo do nosso conjunto original e, em seguida, remover os registros originais do conjunto. Precisamos adicionar um campo extra aos originais antes da densificação para identificá-los. Podemos fazer isso com $set. Observe que tudo isso está dentro do pipeline de agregação. Não estamos editando registros no banco de dados, portanto, não há custo significativo associado a isso.
1 let flagOriginal = {$set: {original:true}} 2 3 let densify = { $densify: { field: "timestamp", 4 range: { step: 1, unit: "minute", bounds : "full" }}} 5 6 7 let fill = { $fill : { sortBy: { timestamp:1}, 8 output: { reading : { method: "linear"} }}} 9 10 let projection = {$project: {_id:0,y:"$reading" ,x:"$timestamp"}} 11 12 let tsdata = db.data.aggregate([flagOriginal, densify,fill,projection]).toArray() 13 graphTime(tsdata)
Agora temos aproximadamente o dobro do número de pontos de dados, originais e gerados, mas podemos usar $match para remover aqueles que sinalizamos como pré-densificação existentes.
1 let flagOriginal = {$set : {original:true}} 2 3 let densify = { $densify : { field: "timestamp", 4 range: { step: 1, unit: "minute", bounds : "full" }}} 5 6 7 let fill = { $fill : { sortBy: { timestamp:1}, 8 output: { reading : { method: "linear"} }}} 9 10 let removeOriginal = { $match : { original : {$ne:true}}} 11 12 let projection = {$project: {_id:0,y:"$reading" ,x:"$timestamp"}} 13 14 let tsdata = db.data.aggregate([flagOriginal, densify,fill, 15 removeOriginal, projection]).toArray() 16 17 graphTime(tsdata)
Finalmente, temos dados espaçados uniformemente com valores calculados com base nos pontos de dados que temos. Teríamos preenchido todos os valores ausentes ou grandes lacunas no processo.
Os novos estágios
$densify
e $fill
podem não parecer muito empolgantes no início, mas são ferramentas fundamentais para trabalhar com dados de séries temporais. Sem $densify
, não há como identificar e adicionar de forma significativa os registros ausentes em uma série temporal. O estágio $fill simplifica muito o processo de computação de valores ausentes em comparação com o uso de $setWindowFields
e a escrita de uma expressão para determinar o valor usando as expressões $linear e $locf ou computando uma média móvel.Isso abre a possibilidade de usar uma ampla gama de algoritmos de análise de séries temporais em Python, R, Spark e outros ambientes analíticos.