How We Backfilled 2 Million Database Documents
Rate this tutorial
We recently needed to backfill nearly two million documents in our MongoDB database with a new attribute and wanted to share our process. First, some context on why we were doing this: This backfill was to support Netlify's Growth team, which builds prototypes into Netlify's core product and then evaluates how those prototypes impact user conversion and retention rates.
If we find that a prototype positively impacts growth, we use that finding to shape deeper investments in a particular area of the product. In this case, to measure the impact of a prototype, we needed to add an attribute that didn't previously exist to one of our database models.
With that out of the way, let's dive into how we did it!
Backend engineer Eric Betts and I started with a script from a smaller version of this task: backfilling 130,000 documents. The smaller backfill had taken about 11 hours, including time to tweak the script and restart it a few times when it died. At a backfill rate of 175-200 documents per minute, we were looking at a best-case scenario of eight to nine days straight of backfilling for over two million total documents, and that's assuming everything went off without a hitch. With a much bigger backfill ahead of us, we needed to see if we could optimize.
The starting script took two arguments—a
batch_size
and thread_pool_size
size—and it worked like this:- Create a new queue.
- Create a variable to store the number of documents we've processed.
- Query the database, limiting returned results to the
batch_size
we passed in. - Push each returned document into the queue.
- Create the number of worker threads we passed in with the
thread_pool_size
argument. - Each thread makes an API call to a third-party API, then writes our new attribute to our database with the result from the third-party API.
- Update our count of documents processed.
- When there are no more documents in the queue to process, clean up the threads.
The script runs on a Kubernetes pod with memory and CPU constraints. It reads from our production MongoDB database and writes to a secondary.
When scaling up the original script to process 20 times the number of documents, we quickly hit some limitations:
Pod memory constraints. Running the script with
batch_size
of two million documents and thread_pool_size
of five was promptly killed by the Kubernetes pod:1 Backfill.run(2000000, 5)
Too much manual intervention. Running with
batch_size
of 100 and thread_pool
of five worked much better:1 Backfill.run(100, 5)
It ran super fast 🚀 there were no errors ✨... but we would have had to manually run it 20,000 times.
Third-party API rate limits. Even with a reliable
batch_size
, we couldn't crank the thread_pool_size
too high or we'd hit rate limits at the third-party API. Our script would finish running, but many of our documents wouldn't actually be backfilled, and we'd have to iterate over them again.Eric and I needed something that met the following criteria:
- Doesn't use so much memory that it kills the Kubernetes pod.
- Doesn't use so much memory that it noticeably increases database read/write latency.
- Iterates through a complete batch of objects at a time; the job shouldn't die before at least attempting to process a full batch.
- Requires minimal babysitting. Some manual intervention is okay, but we need a job to run for several hours by itself.
- Lets us pick up where we left off. If the job dies, we don't want to waste time re-processing documents we've already processed once.
With this list of criteria, we started brainstorming solutions. We could:
- Dig into why the script was timing out before processing the full batch.
- Store references to documents that failed to be updated, and loop back over them later.
- Find a way to order the results returned by the database.
- Automatically add more jobs to the queue once the initial batch was processed.
#1 was an obvious necessity. We started logging the thread index to see if it would tell us anything:
1 def self.run(batch_size, thread_pool_size) 2 jobs = Queue.new 3 4 # get all the documents that meet these criteria 5 objs = Obj.where(...) 6 # limit the returned objects to the batch_size 7 objs = objs.limit(batch_size) 8 # push each document into the jobs queue to be processed 9 objs.each { |o| jobs.push o } 10 11 # create a thread pool 12 workers = (thread_pool_size).times.map do |i| 13 Thread.new do 14 begin 15 while j = jobs.pop(true) 16 # log the thread index and object ID 17 Rails.logger.with_fields(thread: i, obj: obj.id) 18 begin 19 # process objects 20 end 21 ...
This new log line let us see threads die off as the script ran. We'd go from running with five threads:
1 thread="4" obj="939bpca..." 2 thread="1" obj="939apca..." 3 thread="5" obj="939cpca..." 4 thread="2" obj="939dpca..." 5 thread="3" obj="939fpca..." 6 thread="4" obj="969bpca..." 7 thread="1" obj="969apca..." 8 thread="5" obj="969cpca..." 9 thread="2" obj="969dpca..." 10 thread="3" obj="969fpca..."
to running with a few:
1 thread="4" obj="989bpca..." 2 thread="1" obj="989apca..." 3 thread="4" obj="979bpca..." 4 thread="1" obj="979apca..."
to running with none.
We realized that when a thread would hit an error in an API request or a write to our database, we were rescuing and printing the error, but not continuing with the loop. This was a simple fix: When we
rescue
, continue to the next
iteration of the loop.1 begin 2 # process documents 3 rescue 4 next 5 end
In a new run of the script, we needed a way to pick up where we left off. Idea #2—keeping track of failures across iterations of the script—was technically possible, but it wasn't going to be pretty. We expected idea #3—ordering the query results—to solve the same problem, but in a better way, so we went with that instead. Eric came up with the idea to order our query results by
created_at
date. This way, we could pass a not_before
date argument when running the script to ensure that we weren't processing already-processed objects. We could also print each document's created_at
date as it was processed, so that if the script died, we could grab that date and pass it into the next run. Here's what it looked like:1 def self.run(batch_size, thread_pool_size, not_before) 2 jobs = Queue.new 3 4 # order the query results in ascending order 5 objs = Obj.where(...).order(created_at: -1) 6 # get documents created after the not_before date 7 objs = objs.where(:created_at.gte => not_before) 8 # limit the returned documents to the batch_size 9 objs = objs.limit(batch_size) 10 # push each document into the jobs queue to be processed 11 objs.each { |o| jobs.push o } 12 13 workers = (thread_pool_size).times.map do |i| 14 Thread.new do 15 begin 16 while j = jobs.pop(true) 17 # log each document's created_at date as it's processed 18 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at) 19 begin 20 # process documents 21 rescue 22 next 23 end 24 ...
So a log line might look like:
thread="6" obj="979apca..." created_at="Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00"
And if the script died after that line, we could grab that date and pass it back in:
Backfill.run(50000, 10, "Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00")
Nice!
Unfortunately, when we added the ordering, we found that we unintentionally introduced a new memory limitation: the query results were sorted in memory, so we couldn't pass in too large of a batch size or we'd run out of memory on the Kubernetes pod. This lowered our batch size substantially, but we accepted the tradeoff since it eliminated the possibility of redoing work that had already been done.
The last critical task was to make our queue add to itself once the original batch of documents was processed.
Our first approach was to check the queue size, add more objects to the queue when queue size reached some threshold, and re-run the original query, but skip all the returned query results that we'd already processed. We stored the number we'd already processed in a variable called
skip_value
. Each time we added to the queue, we would increase skip_value
and skip an increasingly large number of results.You can tell where this is going. At some point, we would try to skip too large of a value, run out of memory, fail to refill the queue, and the job would die.
1 skip_value = batch_size 2 step = batch_size 3 4 loop do 5 if jobs.size < 1000 6 objs = Obj.where(...).order(created_at: -1) 7 objs = objs.where(:created_at.gte => created_at) 8 objs = objs.skip(skip_value).limit(step) # <--- job dies when this skip_value gets too big ❌ 9 objs.each { |r| jobs.push r } 10 11 skip_value += step # <--- this keeps increasing as we process more objects ❌ 12 13 if objs.count == 0 14 break 15 end 16 end 17 end
We ultimately tossed out the increasing
skip_value
, opting instead to store the created_at
date of the last object processed. This way, we could skip a constant, relatively low number of documents instead of slowing down and eventually killing our query by skipping an increasing number:1 refill_at = 1000 2 step = batch_size 3 4 loop do 5 if jobs.size < refill_at 6 objs = Obj.where(...).order(created_at: -1) 7 objs = objs.where(:created_at.gte => last_created_at) # <--- grab last_created_at constant from earlier in the script ✅ 8 objs = objs.skip(refill_at).limit(step) # <--- skip a constant amount ✅ 9 objs.each { |r| jobs.push r } 10 11 if objs.count == 0 12 break 13 end 14 end 15 end
So, with our existing loop to create and kick off the threads, we have something like this:
1 def self.run(batch_size, thread_pool_size, not_before) 2 jobs = Queue.new 3 4 objs = Obj.where(...).order(created_at: -1) 5 objs = objs.where(:created_at.gte => not_before) 6 objs = objs.limit(step) 7 objs.each { |o| jobs.push o } 8 9 updated = 0 10 last_created_at = "" # <--- we update this variable... 11 12 workers = (thread_pool_size).times.map do |i| 13 Thread.new do 14 begin 15 while j = jobs.pop(true) 16 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at) 17 begin 18 # process documents 19 updated += 1 20 last_created_at = obj.created_at # <--- ...with each document processed 21 rescue 22 next 23 end 24 end 25 end 26 end 27 end 28 29 loop do 30 skip_value = batch_size 31 step = 10000 32 33 if jobs.size < 1000 34 objs = Obj.where(...).order(created: -1) 35 objs = objs.where(:created_at.gte => not_before) 36 objs = objs.skip(skip_value).limit(step) 37 38 objs.each { |r| jobs.push r } 39 skip_value += step 40 41 if objs.count == 0 42 break 43 end 44 end 45 end 46 workers.map(&:join) 47 end
With this, we were finally getting the queue to add to itself when it was done. But the first time we ran this, we saw something surprising. The initial batch of 50,000 documents was processed quickly, and then the next batch that was added by our self-adding queue was processed very slowly. We ran
top -H
to check CPU and memory usage of our script on the Kubernetes pod and saw that it was using 90% of the system's CPU:Adding a few
sleep
statements between loop iterations helped us get CPU usage down to a very reasonable 6% for the main process.With these optimizations ironed out, Eric and I were able to complete our backfill at a processing rate of 800+ documents/minute with no manual intervention. Woohoo!