Hey all, I am currently implementing an Atlas scheduled trigger function and I have some questions with regards to deleting or modifying documents while iterating an aggregation cursor in batches, more specifically whether doing it this way will interfere with the cursor iteration itself and will cause the subseequent batches returned by the cursor to have incorrect results due to the possibly “mis-placed” cursor position after document(s) have been deleted during the aggregation cursor iteration. To make my question more concrete, below is the code that I am currently using for the “delete” scenario (the “update” scenario is also pretty much the same execept that I would pass “update” operations intead of “delete” operations into bulkWrite()
). My main concern/worry is that while I am deleting documents of the current batch while iterating the aggregation cursor, the aggregatation cursor might then point to the wrong location after the deletetion, and the next batch of documents returned will not be correct. I have experimented with this scenario in mongosh and specifically calling .batchSize(1)
, and then read one document by calling .next()
on the mongosh cursor, and then delete that cursor that was just read, and then when I call .next()
again on the aggregation cursor, it’s able to return the next document correctly without skipping anything or returning incorrect result, but I am not sure if this holds true for the API that’s used in the Atlas functions and whether this holds up for larger batches, and whether this is a good practice in genenral because I am not sure if deleting documents while iterating cursor will cause any not-so-obvious issues down the road.
For references, I found these two similar posts, but I didn’t get a good conclusion on what the behavior should be from the discussions there.
https://www.mongodb.com/community/forums/t/deleting-resources-in-cursor-loop/8910
https://www.mongodb.com/community/forums/t/can-cursor-reads-miss-matching-documents-if-there-is-deletion-of-only-already-read-entries/155362
const postsCollection = mongodb.db(DATABASE_NAME).collection("posts");
const BATCH_SIZE = 100;
const postCursor = postsCollection.aggregate([
{
// Some $lookup operations
},
{
// Some $match condition based on the previous $lookup stage
},
{
$project: {
"_id": 1,
},
},
]);
let currentBatchPosts = [];
let currentPost;
const processBatchPosts = async () => {
// Get the ids of the post documents in the current batch
const ids = currentBatchPosts.map((post) => post._id);
// Delete all post documents in the current batch by their ids
try {
for (let i = 0; i < ids.length; i += BATCH_SIZE) {
const currentIds = ids.slice(i, Math.min(i + BATCH_SIZE, ids.length));
operations.push({
deleteMany: {
filter: { _id: {$in: currentIds } },
},
});
}
await collection.bulkWrite(operations);
} catch (err) {
console.error(err);
}
currentBatchPosts = [];
};
// Iterate over the cursor
while ((currentPost = await postCursor.next())) {
currentBatchPosts.push(currentPost);
if (currentBatchPost.length === BATCH_SIZE) {
await processBatchPosts();
}
}
// Delete any remaining documents (if there are any) in the last batch
if (currentBatchPosts.length > 0) {
await processBatchPosts();
}