Como preenchemos 2 milhões de documentos de banco de dados
Avalie esse Tutorial
Recentemente, precisávamos preencher quase dois milhões de documentos em nosso MongoDB database com um novo atributo e queríamos compartilhar nosso processo. Primeiro, um contexto sobre por que estávamos fazendo isso: esse preenchimento foi para apoiar a equipe de crescimento da Netlify, que cria protótipos no produto principal da Netlify e depois avalia como esses protótipos afetam as taxas de conversão eretenção de usuários.
Se descobrirmos que um protótipo afeta positivamente o crescimento, usaremos essa descoberta para moldar investimentos mais profundos em uma área específica do produto. Nesse caso, para medir o impacto de um protótipo, precisávamos adicionar um atributo que não existia anteriormente a um de nossos modelos de banco de dados.
Com isso fora do caminho, vamos mostrar como fizemos isso!
O engenheiro de back-end Eric Betts e eu começamos com um script de uma versão menor desta tarefa: preenchimento de 130,000 documentos. O preenchimento menor levou cerca de 11 horas, incluindo tempo para ajustar o script e reiniciá-lo algumas vezes quando ele morreu. Com uma taxa de preenchimento de 175a200 documentos por minuto, estávamos olhando para o melhor cenário de oito a nove dias seguidos de preenchimento para mais de dois milhões de documentos no total, e isso supondo que tudo tenha ocorrido sem problemas. Com um preenchimento muito maior pela frente, precisvamos ver se poderíamos otimizar.
O script inicial recebeu dois argumentos - um tamanho
batch_size
e thread_pool_size
- e funcionou assim:- Criar uma nova fila.
- Crie uma variável para armazenar o número de documentos que processamos.
- Consulte o banco de dados, limitando os resultados retornados ao
batch_size
que passamos. - Envie cada documento retornado para a fila.
- Crie o número de threads de trabalho que passamos com o argumento
thread_pool_size
. - Cada thread faz uma chamada de API para uma API de terceiros e, em seguida, grava nosso novo atributo em nosso banco de dados com o resultado da API de terceiros.
- Atualize nossa contagem de documentos processados.
- Quando não houver mais documentos na fila para processar, limpe os threads.
O roteiro é executado em um pod do Kubernetes com restrições de memória e CPU. Ele lê do nosso MongoDB database de produção e grava em um secundário.
Ao ampliar o script original para processar 20 vezes o número de documentos, rapidamente encontramos algumas limitações:
Restrições de memória do pod. A execução do script com
batch_size
de dois milhões de documentos e thread_pool_size
de cinco foi imediatamente eliminada pelo pod do Kubernetes:1 Backfill.run(2000000, 5)
Muita intervenção manual. Correr com
batch_size
de 100 e thread_pool
de cinco funcionou muito melhor:1 Backfill.run(100, 5)
Correu super rápido 🚀, não houve erros ✨ ... mas teríamos que executá-lo manualmente 20,000 vezes.
Limites de taxa de API de terceiros. Mesmo com um
batch_size
confiável , não podíamos aumentar muito o thread_pool_size
ou atingiríamos os limites de taxa na API de terceiros. Nosso script terminaria de ser executado, mas muitos de nossos documentos não seriam preenchidos e teríamos que iterar sobre eles novamente.Eric e eu precisávamos de algo que atendesse aos seguintes critérios:
- Não usa tanta memória a ponto de matar o pod do Kubernetes.
- Não usa tanta memória que aumente visivelmente a latência de leitura/gravação do banco de dados.
- Itera através de um lote completo de objetos de cada vez; a tarefa não deve terminar antes de pelo menos tentar processar um lote completo.
- Requer o mínimo de serviço de babá. Algumas intervenções manuais são boas, mas precisamos de um trabalho para ser executado por várias horas por si só.
- Vamos retomar de onde paramos. Se o trabalho morrer, não queremos perder tempo reprocessando documentos que já processamos uma vez.
Com essa lista de critérios, começamos a pensar em soluções. Poderíamos:
- Descubra por que o script estava expirando antes de processar o lote completo.
- Armazene referências a documentos que não foram atualizados e retorne a eles mais tarde.
- Encontre uma maneira de ordenar os resultados retornados pelo banco de dados.
- Adicionar automaticamente mais trabalhos à fila depois que o lote inicial for processado.
#1 era uma necessidade obvia. Começamos a registrar o índice do thread para ver se ele nos informava algo:
1 def self.run(batch_size, thread_pool_size) 2 jobs = Queue.new 3 4 # get all the documents that meet these criteria 5 objs = Obj.where(...) 6 # limit the returned objects to the batch_size 7 objs = objs.limit(batch_size) 8 # push each document into the jobs queue to be processed 9 objs.each { |o| jobs.push o } 10 11 # create a thread pool 12 workers = (thread_pool_size).times.map do |i| 13 Thread.new do 14 begin 15 while j = jobs.pop(true) 16 # log the thread index and object ID 17 Rails.logger.with_fields(thread: i, obj: obj.id) 18 begin 19 # process objects 20 end 21 ...
Esta nova linha de log nos permite ver os threads morrendo à medida que o script é executado. Go a executar com cinco threads:
1 thread="4" obj="939bpca..." 2 thread="1" obj="939apca..." 3 thread="5" obj="939cpca..." 4 thread="2" obj="939dpca..." 5 thread="3" obj="939fpca..." 6 thread="4" obj="969bpca..." 7 thread="1" obj="969apca..." 8 thread="5" obj="969cpca..." 9 thread="2" obj="969dpca..." 10 thread="3" obj="969fpca..."
para correr com alguns:
1 thread="4" obj="989bpca..." 2 thread="1" obj="989apca..." 3 thread="4" obj="979bpca..." 4 thread="1" obj="979apca..."
para correr sem nenhum.
Notamos que, quando um thread atingia um erro em uma solicitação de API ou em uma gravação em nosso banco de dados, estavámos salvando e imprimindo o erro, mas não continuando com o loop. Foi uma correção simples: quando
rescue
, continue com a iteraçãonext
do loop.1 begin 2 # process documents 3 rescue 4 next 5 end
Em uma nova execução do script, precisávamos de uma maneira de continuar de onde paramos. A ideia #2- acompanhar as falhas nas iterações do script - era tecnicamente possível, mas não seria bonita. Esperávamos que a ideia #3- ordenar os resultados da consulta - resolvesse o mesmo problema, mas de uma maneira melhor, então optamos por isso. Eric teve a ideia de ordenar os resultados da nossa consulta por
created_at
data. Dessa forma, poderíamos passar um argumento de data denot_before
ao executar o script para garantir que não estávamos processando objetos já processados. Também podíamos imprimir a data decreated_at
de cada documento à medida que era processado, para que, se o script morresse, pudéssemos pegar essa data e passá-la para a próxima execução. Aqui está o que parecia:1 def self.run(batch_size, thread_pool_size, not_before) 2 jobs = Queue.new 3 4 # order the query results in ascending order 5 objs = Obj.where(...).order(created_at: -1) 6 # get documents created after the not_before date 7 objs = objs.where(:created_at.gte => not_before) 8 # limit the returned documents to the batch_size 9 objs = objs.limit(batch_size) 10 # push each document into the jobs queue to be processed 11 objs.each { |o| jobs.push o } 12 13 workers = (thread_pool_size).times.map do |i| 14 Thread.new do 15 begin 16 while j = jobs.pop(true) 17 # log each document's created_at date as it's processed 18 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at) 19 begin 20 # process documents 21 rescue 22 next 23 end 24 ...
Então, uma linha de log pode se parecer com:
thread="6" obj="979apca..." created_at="Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00"
E se o script morresse depois dessa linha, poderíamos pegar essa data e devolvê-la:
Backfill.run(50000, 10, "Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00")
Nice!
Infelizmente, quando adicionamos a ordem, descobrimos que, sem querer, introduzimos uma nova limitação de memória: os resultados da consulta eram classificados na memória, para que não pudéssemos passar um lote muito grande ou ficaríamos sem memória no pod do Kubernetes. Isso reduziu substancialmente o tamanho do nosso lote, mas aceitamos a desvantagem, pois eliminou a possibilidade de refazer o trabalho que já havia sido feito.
A última tarefa crítica era fazer com que nossa fila se completasse depois que o lote original de documentos fosse processado.
Nossa primeira abordagem foi verificar o tamanho da fila, adicionar mais objetos à fila quando o tamanho da fila atingisse algum limite e executar novamente a consulta original, mas ignorar todos os resultados da consulta retornados que já havíamos processado. Armazenamos o número que já processamos em uma variável chamada
skip_value
. Cada vez que adicionamos algo à fila, aumentaríamos skip_value
e pularíamos um número cada vez maior de resultados.Você pode ver aonde isso vai dar. Em algum momento, tentaríamos pular um valor muito grande, ficaríamos sem memória, não conseguiríamos reabastecer a fila e o trabalho morreria.
1 skip_value = batch_size 2 step = batch_size 3 4 loop do 5 if jobs.size < 1000 6 objs = Obj.where(...).order(created_at: -1) 7 objs = objs.where(:created_at.gte => created_at) 8 objs = objs.skip(skip_value).limit(step) # <--- job dies when this skip_value gets too big ❌ 9 objs.each { |r| jobs.push r } 10 11 skip_value += step # <--- this keeps increasing as we process more objects ❌ 12 13 if objs.count == 0 14 break 15 end 16 end 17 end
Por fim, descartamos o aumento
skip_value
, optando por armazenar a datacreated_at
do último objeto processado. Dessa forma, poderíamos pular um número constante e relativamente baixo de documentos em vez de desacelerar e eventualmente encerrar nossa consulta pulando um número crescente:1 refill_at = 1000 2 step = batch_size 3 4 loop do 5 if jobs.size < refill_at 6 objs = Obj.where(...).order(created_at: -1) 7 objs = objs.where(:created_at.gte => last_created_at) # <--- grab last_created_at constant from earlier in the script ✅ 8 objs = objs.skip(refill_at).limit(step) # <--- skip a constant amount ✅ 9 objs.each { |r| jobs.push r } 10 11 if objs.count == 0 12 break 13 end 14 end 15 end
Então, com nosso loop existente para criar e iniciar os threads, temos algo assim:
1 def self.run(batch_size, thread_pool_size, not_before) 2 jobs = Queue.new 3 4 objs = Obj.where(...).order(created_at: -1) 5 objs = objs.where(:created_at.gte => not_before) 6 objs = objs.limit(step) 7 objs.each { |o| jobs.push o } 8 9 updated = 0 10 last_created_at = "" # <--- we update this variable... 11 12 workers = (thread_pool_size).times.map do |i| 13 Thread.new do 14 begin 15 while j = jobs.pop(true) 16 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at) 17 begin 18 # process documents 19 updated += 1 20 last_created_at = obj.created_at # <--- ...with each document processed 21 rescue 22 next 23 end 24 end 25 end 26 end 27 end 28 29 loop do 30 skip_value = batch_size 31 step = 10000 32 33 if jobs.size < 1000 34 objs = Obj.where(...).order(created: -1) 35 objs = objs.where(:created_at.gte => not_before) 36 objs = objs.skip(skip_value).limit(step) 37 38 objs.each { |r| jobs.push r } 39 skip_value += step 40 41 if objs.count == 0 42 break 43 end 44 end 45 end 46 workers.map(&:join) 47 end
Com isso, finalmente fizemos a fila adicionar a si mesma quando estivesse pronta. Mas na primeira vez que executamos isso, vimos algo surpreendente. O lote inicial de documentos 50,000 foi processado rapidamente e, em seguida, o próximo lote adicionado por nossa fila de adição automática foi processado muito lentamente. Executamos
top -H
para verificar o uso da CPU e da memória de nosso script no pod do Kubernetes e vimos que ele estava usando 90% da CPU do sistema:Adicionar algumas declarações
sleep
entre as iterações de loop nos ajuda a reduzir o uso da CPU a 6% muito razoáveis para o processo principal.Com essas otimizações resolvidas, Eric e eu conseguimos concluir nosso preenchimento com uma taxa de processamento de 800+ documentos/minuto sem intervenção manual. Eba!