Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
MongoDBchevron-right

Como preenchemos 2 milhões de documentos de banco de dados

Eric Betts, Jen Kagan7 min read • Published Mar 15, 2022 • Updated May 09, 2022
KubernetesMongoDB
Ícone do FacebookÍcone do Twitterícone do linkedin
Netlify
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Recentemente, precisávamos preencher quase dois milhões de documentos em nosso MongoDB database com um novo atributo e queríamos compartilhar nosso processo. Primeiro, um contexto sobre por que estávamos fazendo isso: esse preenchimento foi para apoiar a equipe de crescimento da Netlify, que cria protótipos no produto principal da Netlify e depois avalia como esses protótipos afetam as taxas de conversão eretenção de usuários.
Se descobrirmos que um protótipo afeta positivamente o crescimento, usaremos essa descoberta para moldar investimentos mais profundos em uma área específica do produto. Nesse caso, para medir o impacto de um protótipo, precisávamos adicionar um atributo que não existia anteriormente a um de nossos modelos de banco de dados.
Com isso fora do caminho, vamos mostrar como fizemos isso!
O engenheiro de back-end Eric Betts e eu começamos com um script de uma versão menor desta tarefa: preenchimento de 130,000 documentos. O preenchimento menor levou cerca de 11 horas, incluindo tempo para ajustar o script e reiniciá-lo algumas vezes quando ele morreu. Com uma taxa de preenchimento de 175a200 documentos por minuto, estávamos olhando para o melhor cenário de oito a nove dias seguidos de preenchimento para mais de dois milhões de documentos no total, e isso supondo que tudo tenha ocorrido sem problemas. Com um preenchimento muito maior pela frente, precisvamos ver se poderíamos otimizar.
O script inicial recebeu dois argumentos - um tamanhobatch_size e thread_pool_size - e funcionou assim:
  1. Criar uma nova fila.
  2. Crie uma variável para armazenar o número de documentos que processamos.
  3. Consulte o banco de dados, limitando os resultados retornados ao batch_size que passamos.
  4. Envie cada documento retornado para a fila.
  5. Crie o número de threads de trabalho que passamos com o argumentothread_pool_size .
  6. Cada thread faz uma chamada de API para uma API de terceiros e, em seguida, grava nosso novo atributo em nosso banco de dados com o resultado da API de terceiros.
  7. Atualize nossa contagem de documentos processados.
  8. Quando não houver mais documentos na fila para processar, limpe os threads.
O roteiro é executado em um pod do Kubernetes com restrições de memória e CPU. Ele lê do nosso MongoDB database de produção e grava em um secundário.

Mais repositórios, mais problemas

Ao ampliar o script original para processar 20 vezes o número de documentos, rapidamente encontramos algumas limitações:
Restrições de memória do pod. A execução do script com batch_size de dois milhões de documentos e thread_pool_size de cinco foi imediatamente eliminada pelo pod do Kubernetes:
1Backfill.run(2000000, 5)
Muita intervenção manual. Correr com batch_size de 100 e thread_pool de cinco funcionou muito melhor:
1Backfill.run(100, 5)
Correu super rápido 🚀, não houve erros ✨ ... mas teríamos que executá-lo manualmente 20,000 vezes.
Limites de taxa de API de terceiros. Mesmo com umbatch_sizeconfiável , não podíamos aumentar muito o thread_pool_size ou atingiríamos os limites de taxa na API de terceiros. Nosso script terminaria de ser executado, mas muitos de nossos documentos não seriam preenchidos e teríamos que iterar sobre eles novamente.

Soluções de brainstorming

Eric e eu precisávamos de algo que atendesse aos seguintes critérios:
  • Não usa tanta memória a ponto de matar o pod do Kubernetes.
  • Não usa tanta memória que aumente visivelmente a latência de leitura/gravação do banco de dados.
  • Itera através de um lote completo de objetos de cada vez; a tarefa não deve terminar antes de pelo menos tentar processar um lote completo.
  • Requer o mínimo de serviço de babá. Algumas intervenções manuais são boas, mas precisamos de um trabalho para ser executado por várias horas por si só.
  • Vamos retomar de onde paramos. Se o trabalho morrer, não queremos perder tempo reprocessando documentos que já processamos uma vez.
Com essa lista de critérios, começamos a pensar em soluções. Poderíamos:
  1. Descubra por que o script estava expirando antes de processar o lote completo.
  2. Armazene referências a documentos que não foram atualizados e retorne a eles mais tarde.
  3. Encontre uma maneira de ordenar os resultados retornados pelo banco de dados.
  4. Adicionar automaticamente mais trabalhos à fila depois que o lote inicial for processado.

Otimizações

Você está dentro do tempo limite

#1 era uma necessidade obvia. Começamos a registrar o índice do thread para ver se ele nos informava algo:
1def self.run(batch_size, thread_pool_size)
2 jobs = Queue.new
3
4 # get all the documents that meet these criteria
5 objs = Obj.where(...)
6 # limit the returned objects to the batch_size
7 objs = objs.limit(batch_size)
8 # push each document into the jobs queue to be processed
9 objs.each { |o| jobs.push o }
10
11 # create a thread pool
12 workers = (thread_pool_size).times.map do |i|
13 Thread.new do
14 begin
15 while j = jobs.pop(true)
16 # log the thread index and object ID
17 Rails.logger.with_fields(thread: i, obj: obj.id)
18 begin
19 # process objects
20 end
21...
Esta nova linha de log nos permite ver os threads morrendo à medida que o script é executado. Go a executar com cinco threads:
1thread="4" obj="939bpca..."
2thread="1" obj="939apca..."
3thread="5" obj="939cpca..."
4thread="2" obj="939dpca..."
5thread="3" obj="939fpca..."
6thread="4" obj="969bpca..."
7thread="1" obj="969apca..."
8thread="5" obj="969cpca..."
9thread="2" obj="969dpca..."
10thread="3" obj="969fpca..."
para correr com alguns:
1thread="4" obj="989bpca..."
2thread="1" obj="989apca..."
3thread="4" obj="979bpca..."
4thread="1" obj="979apca..."
para correr sem nenhum.
Notamos que, quando um thread atingia um erro em uma solicitação de API ou em uma gravação em nosso banco de dados, estavámos salvando e imprimindo o erro, mas não continuando com o loop. Foi uma correção simples: quando rescue, continue com a iteraçãonextdo loop.
1 begin
2 # process documents
3 rescue
4 next
5 end

Ordem, ordem

Em uma nova execução do script, precisávamos de uma maneira de continuar de onde paramos. A ideia #2- acompanhar as falhas nas iterações do script - era tecnicamente possível, mas não seria bonita. Esperávamos que a ideia #3- ordenar os resultados da consulta - resolvesse o mesmo problema, mas de uma maneira melhor, então optamos por isso. Eric teve a ideia de ordenar os resultados da nossa consulta por created_atdata. Dessa forma, poderíamos passar um argumento de data denot_before ao executar o script para garantir que não estávamos processando objetos já processados. Também podíamos imprimir a data decreated_atde cada documento à medida que era processado, para que, se o script morresse, pudéssemos pegar essa data e passá-la para a próxima execução. Aqui está o que parecia:
1def self.run(batch_size, thread_pool_size, not_before)
2 jobs = Queue.new
3
4 # order the query results in ascending order
5 objs = Obj.where(...).order(created_at: -1)
6 # get documents created after the not_before date
7 objs = objs.where(:created_at.gte => not_before)
8 # limit the returned documents to the batch_size
9 objs = objs.limit(batch_size)
10 # push each document into the jobs queue to be processed
11 objs.each { |o| jobs.push o }
12
13 workers = (thread_pool_size).times.map do |i|
14 Thread.new do
15 begin
16 while j = jobs.pop(true)
17 # log each document's created_at date as it's processed
18 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at)
19 begin
20 # process documents
21 rescue
22 next
23 end
24...
Então, uma linha de log pode se parecer com: thread="6" obj="979apca..." created_at="Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00"
E se o script morresse depois dessa linha, poderíamos pegar essa data e devolvê-la: Backfill.run(50000, 10, "Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00")
Nice!
Infelizmente, quando adicionamos a ordem, descobrimos que, sem querer, introduzimos uma nova limitação de memória: os resultados da consulta eram classificados na memória, para que não pudéssemos passar um lote muito grande ou ficaríamos sem memória no pod do Kubernetes. Isso reduziu substancialmente o tamanho do nosso lote, mas aceitamos a desvantagem, pois eliminou a possibilidade de refazer o trabalho que já havia sido feito.

O trabalho nunca termina

A última tarefa crítica era fazer com que nossa fila se completasse depois que o lote original de documentos fosse processado.
Nossa primeira abordagem foi verificar o tamanho da fila, adicionar mais objetos à fila quando o tamanho da fila atingisse algum limite e executar novamente a consulta original, mas ignorar todos os resultados da consulta retornados que já havíamos processado. Armazenamos o número que já processamos em uma variável chamada skip_value. Cada vez que adicionamos algo à fila, aumentaríamos skip_value e pularíamos um número cada vez maior de resultados.
Você pode ver aonde isso vai dar. Em algum momento, tentaríamos pular um valor muito grande, ficaríamos sem memória, não conseguiríamos reabastecer a fila e o trabalho morreria.
1 skip_value = batch_size
2 step = batch_size
3
4 loop do
5 if jobs.size < 1000
6 objs = Obj.where(...).order(created_at: -1)
7 objs = objs.where(:created_at.gte => created_at)
8 objs = objs.skip(skip_value).limit(step) # <--- job dies when this skip_value gets too big ❌
9 objs.each { |r| jobs.push r }
10
11 skip_value += step # <--- this keeps increasing as we process more objects ❌
12
13 if objs.count == 0
14 break
15 end
16 end
17 end
Por fim, descartamos o aumento skip_value, optando por armazenar a datacreated_at do último objeto processado. Dessa forma, poderíamos pular um número constante e relativamente baixo de documentos em vez de desacelerar e eventualmente encerrar nossa consulta pulando um número crescente:
1 refill_at = 1000
2 step = batch_size
3
4 loop do
5 if jobs.size < refill_at
6 objs = Obj.where(...).order(created_at: -1)
7 objs = objs.where(:created_at.gte => last_created_at) # <--- grab last_created_at constant from earlier in the script ✅
8 objs = objs.skip(refill_at).limit(step) # <--- skip a constant amount ✅
9 objs.each { |r| jobs.push r }
10
11 if objs.count == 0
12 break
13 end
14 end
15 end
Então, com nosso loop existente para criar e iniciar os threads, temos algo assim:
1def self.run(batch_size, thread_pool_size, not_before)
2 jobs = Queue.new
3
4 objs = Obj.where(...).order(created_at: -1)
5 objs = objs.where(:created_at.gte => not_before)
6 objs = objs.limit(step)
7 objs.each { |o| jobs.push o }
8
9 updated = 0
10 last_created_at = "" # <--- we update this variable...
11
12 workers = (thread_pool_size).times.map do |i|
13 Thread.new do
14 begin
15 while j = jobs.pop(true)
16 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at)
17 begin
18 # process documents
19 updated += 1
20 last_created_at = obj.created_at # <--- ...with each document processed
21 rescue
22 next
23 end
24 end
25 end
26 end
27 end
28
29 loop do
30 skip_value = batch_size
31 step = 10000
32
33 if jobs.size < 1000
34 objs = Obj.where(...).order(created: -1)
35 objs = objs.where(:created_at.gte => not_before)
36 objs = objs.skip(skip_value).limit(step)
37
38 objs.each { |r| jobs.push r }
39 skip_value += step
40
41 if objs.count == 0
42 break
43 end
44 end
45 end
46 workers.map(&:join)
47end
Com isso, finalmente fizemos a fila adicionar a si mesma quando estivesse pronta. Mas na primeira vez que executamos isso, vimos algo surpreendente. O lote inicial de documentos 50,000 foi processado rapidamente e, em seguida, o próximo lote adicionado por nossa fila de adição automática foi processado muito lentamente. Executamos top -H para verificar o uso da CPU e da memória de nosso script no pod do Kubernetes e vimos que ele estava usando 90% da CPU do sistema:
Adicionar algumas declaraçõessleep entre as iterações de loop nos ajuda a reduzir o uso da CPU a 6% muito razoáveis para o processo principal.
Com essas otimizações resolvidas, Eric e eu conseguimos concluir nosso preenchimento com uma taxa de processamento de 800+ documentos/minuto sem intervenção manual. Eba!

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Teste e empacotamento de uma biblioteca Python


Aug 14, 2024 | 8 min read
exemplo de código

Exemplo de aplicativo para cuidadores de cães


Jul 12, 2024 | 3 min read
Artigo

Estruturando dados com Serde em Rust


Apr 23, 2024 | 5 min read
Tutorial

Chat em tempo real em um jogo de Phaser com MongoDB e Socket.io


Feb 03, 2023 | 11 min read
Sumário
  • Mais repositórios, mais problemas