DLQ captures errors, but "normal" output missing?

I have this script defining a stream processor.
My expectation is that the DLQ would contain documents where the field x is missing, or isn’t an int, or is out of range.
When I create and start the processor, the DLQ indeed has the errant documents. But the other documents [{x:1}, {x:2}, {x:3}] seem to be dropped into ether: they are neither in the DLQ nor does the tumbling window pick them up. The audit logs contain nothing.

Any insight into why this is happening?

const DB_NAME = "demo"
const GOOD_ONES = "theGoodOnes"
const BAD_ONES = "theBadOnes"
const PROCESSOR_NAME = "myDlqDemoProcessor"

let processor = sp.getProcessor(PROCESSOR_NAME)

try {
  processor.stop();
  print(`Dropped "${PROCESSOR_NAME}"...`);
}
catch (e) { print(`Can't stop "${PROCESSOR_NAME}"...`) }
try {
  processor.drop();
  print(`Dropped "${PROCESSOR_NAME}"...`);
}
catch (e) { print(`Can't drop "${PROCESSOR_NAME}"...`) }

let mockDocs = [
  { x: 0 }, // small
  { x: 1 },
  { x: 3.14 }, // non-int
  { x: 2 },
  { x: "22" }, // non-int
  { x: 3 },
  { text: 'yo' }, // non-existent
  { x: 4 },
]

/***
 * Set up pipeline stages
 */

// An inline source of documents from the mock documents array
let sourceStream = {
  $source: {
    documents: mockDocs
  }
}


// validation
let documentValidator = {
  $validate: {
    validator: {
      $jsonSchema: {
        required: ["x"],
        properties: {
          x: {
            bsonType: "int",
            minimum: 1,
            maximum: 42,
            description: "Valid 'x' is an int in range [1,42]"
          }
        }
      }
    },
    validationAction: "dlq"
  }
}

// Window Function
let calculate = {
  $tumblingWindow: {
    interval: {
      size: NumberInt(3),
      unit: "second"
    },
    pipeline: [
      {
        $group: {
          _id: "allOfThem",
          theSum: { $sum: "$x" },
          theValues: { $push: "$x" }
        }
      }
    ]
  }
}

// direct output to a collection 
let finalOutput = {
  $merge: {
    into: {
      connectionName: OUTPUT_CONNECTION,
      db: DB_NAME,
      coll: GOOD_ONES
    },
    whenMatched: "replace",
    whenNotMatched: "insert"
  }
}

let processorOptions = {
  dlq: {
    // dead letter queue definition:
    connectionName: OUTPUT_CONNECTION,
    db: DB_NAME,
    coll: BAD_ONES
  }
}

let create = () => sp.createStreamProcessor(
  PROCESSOR_NAME,
  [sourceStream, documentValidator, calculate, finalOutput],
  processorOptions
);

// Start it:
// processor = create(); processor.start(); 

// Or: 
// create().start();

If there’s a failure in the aggregation, I’d expect some visibility into an error.

Hi Nuri,

Atlas Stream Processing uses event time to advance the high watermark and not the process time from the wall clock. The window you have created is holding the documents waiting for the watermark to advance enough that 3 seconds have passed. Using $source.documents causes all of the input docs to have very close times to one another and so the watermark doesn’t move enough.

You could enrich these documents to have a timestamp and use timeField in $source, or you can update the window to include the idleTimeout property which will cause the window to flush the records one it detects no more are being consumed.

Using IdleTimeout with the window

let calculate = {
  $tumblingWindow: {
    interval: {
      size: NumberInt(3),
      unit: "second"
    },
    idleTimeout : {size : 1, unit : "second"},
    pipeline: [
      {
        $group: {
          _id: "allOfThem",
          theSum: { $sum: "$x" },
          theValues: { $push: "$x" }
        }
      }
    ]
  }
}
1 Like

The closing behavior of the window as it relates to watermark is now clearer, and the issue is resolved with the idleTimeout.
Thanks for looking at this issue and resolving my confusion here!