Hello to the community.
Please help me. I’m stuck with the following task:
- There is incoming collection that collects raw analytics events.
- Once in ten minutes, a scheduled trigger should run the aggregation and merge results to another collection.
- Clean up processed events.
I have following code:
// Marking all events in the collection for aggregation (set marker field)
await incomingEvents.updateMany({}, { $set: { aggregated: true }});
// Aggregating events (with: $match: { aggregated: true } stage
const result = await incomingEvents.aggregate(pipeline);
// Cleaning up processed events
await incomingEvents.deleteMany({ aggregated: true });
return result;
But it doesn’t work. It updates documents with the marker field and deletes them, but not aggregates. I’ve checked it and commented on the lines. If I comment out the last deleteMany command it works. But, the logic breaks at this point because it picks up the already processed documents on the next run.
And yes, I’ve checked the aggregation pipeline. Everything working fine but not together.