Docs Menu
Docs Home
/
MongoDB Atlas
/

Manage Stream Processors

On this page

  • Prerequisites
  • Considerations
  • Create a Stream Processor Interactively
  • Connect to your stream processing instance.
  • Define a pipeline.
  • Create a stream processor.
  • Create a Stream Processor
  • Start a Stream Processor
  • Stop a Stream Processor
  • Modify a Stream Processor
  • Limitations
  • To modify a stream processor:
  • Drop a Stream Processor
  • List Available Stream Processors
  • Sample from a Stream Processor
  • View Statistics of a Stream Processor

An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing instance its definition is stored in. Atlas Stream Processing supports up to 4 stream processors per worker. For additional processors that exceed this limit, Atlas Stream Processing allocates a new resource.

To create and manage a stream processor, you must have:

Many stream processor commands require you to specify the name of the relevant stream processor in the method invocation. The syntax described in the following sections assumes strictly alphanumeric names. If your stream processor's name includes non-alphanumeric characters such as hyphens (-) or full stops (.), you must enclose the name in square brackets ([]) and double quotes ("") in the method invocation, as in sp.["special-name-stream"].stats().

You can create a stream processor interactively with the sp.process() method. Stream processors that you create interactively exhibit the following behavior:

  • Write output and dead letter queue documents to the shell

  • Begin running immediately upon creation

  • Run for either 10 minutes or until the user stops them

  • Don't persist after stopping

Stream processors that you create interactively are intended for prototyping. To create a persistent stream processor, see Create a Stream Processor.

sp.process() has the following syntax:

sp.process(<pipeline>)
Field
Type
Necessity
Description

pipeline

array

Required

Stream aggregation pipeline you want to apply to your streaming data.

1

Use the connection string associated with your stream processing instance to connect using mongosh.

Example

The following command connects to a stream processing instance as a user named streamOwner using x.059 authentication:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Provide your user password when prompted.

2

In the mongosh prompt, assign an array containing the aggregation stages you want to apply to a variable named pipeline.

The following example uses the stuff topic in the myKafka connection in the connection registry as the $source, matches records where the temperature field has a value of 46 and emits the processed messages to the output topic of the mySink connection in the connection registry:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

The following command creates a stream processor that applies the logic defined in pipeline.

sp.process(pipeline)

To create a stream processor:

The Atlas Administration API provides an endpoint for creating a stream processor.

Create One Stream Processor

To create a new stream processor with mongosh, use the sp.createStreamProcessor() method. It has the following syntax:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
Type
Necessity
Description

name

string

Required

Logical name for the stream processor. This must be unique within the stream processing instance. This name should contain only alphanumeric characters.

pipeline

array

Required

Stream aggregation pipeline you want to apply to your streaming data.

options

object

Optional

Object defining various optional settings for your stream processor.

options.dlq

object

Conditional

Object assigning a dead letter queue for your stream processing instance. This field is necessary if you define the options field.

options.dlq.connectionName

string

Conditional

Human-readable label that identifies a connection in your connection registry. This connection must reference an Atlas cluster. This field is necessary if you define the options.dlq field.

options.dlq.db

string

Conditional

Name of an Atlas database on the cluster specified in options.dlq.connectionName. This field is necessary if you define the options.dlq field.

options.dlq.coll

string

Conditional

Name of a collection in the database specified in options.dlq.db. This field is necessary if you define the options.dlq field.

1

Use the connection string associated with your stream processing instance to connect using mongosh.

Example

The following command connects to a stream processing instance as a user named streamOwner using x.059 authentication.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Provide your user password when prompted.

2

In the mongosh prompt, assign an array containing the aggregation stages you want to apply to a variable named pipeline.

The following example uses the stuff topic in the myKafka connection in the connection registry as the $source, matches records where the temperature field has a value of 46 and emits the processed messages to the output topic of the mySink connection in the connection registry:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

In the mongosh prompt, assign an object containing the following properties of your DLQ:

  • Connection name

  • Database name

  • Collection name

The following example defines a DLQ over the cluster01 connection, in the metadata.dlq database collection.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

The following command creates a stream processor named proc01 that applies the logic defined in pipeline. Documents that throw errors in processing are written to the DLQ defined in deadLetter.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

Note

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

To start a stream processor:

The Atlas Administration API provides an endpoint for starting a stream processor.

Start One Stream Processor

To start an existing stream processor with mongosh, use the sp.<streamprocessor>.start() method. <streamprocessor> must be the name of a stream processor defined for the current stream processing instance.

For example, to start a stream processor named proc01, run the following command:

sp.proc01.start()

This method returns:

  • true if the stream processor exists and isn't currently running.

  • false if you try to start a stream processor that doesn't exist, or exists and is currently running.

Note

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

To stop a stream processor:

The Atlas Administration API provides an endpoint for stopping a stream processor.

Stop One Stream Processor

To stop an existing stream processor with mongosh, use the sp.<streamprocessor>.stop() method. <streamprocessor> must be the name of a currently running stream processor defined for the current stream processing instance.

For example, to stop a stream processor named proc01, run the following command:

sp.proc01.stop()

This method returns:

  • true if the stream processor exists and is currently running.

  • false if the stream processor doesn't exist, or if the stream processor isn't currently running.

You can modify the following elements of an existing stream processor:

To modify a stream processor, do the following:

  1. Stop the stream processor.

  2. Apply your update to the stream processor.

  3. Restart the stream processor.

By default, modified processors restore from the last checkpoint. Alternatively, you can set resumeFromCheckpoint=false, in which case the processor only retains summary stats. When you modify a processor with open windows, the windows are entirely recomputed on the updated pipeline.

When the default setting resumeFromCheckpoint=true is enabled, the following limitations apply:

  • You can't modify the $source stage.

  • You can't modify the interval of your window.

  • You can't remove a window.

  • You can only modify a pipeline with a window if that window has either a $group or $sort stage in its inner pipeline.

  • You can't change an existing window type. For example, you can't change from a $tumblingWindow to a $hoppingWindow or vice versa.

  • Processors with windows may reprocess some data as a product of recalculating the windows.

Requires mongosh v2.3.4+.

Use the sp.<streamprocessor>.modify() command to modify an existing stream processor. <streamprocessor> must be the name of a stopped stream processor defined for the current stream processing instance.

For example, to modify a stream processor named proc01, run the following command:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

The Atlas Administration API provides an endpoint for modifying a stream processor.

Modify One Stream Processor

To drop a stream processor:

The Atlas Administration API provides an endpoint for deleting a stream processor.

Delete One Stream Processor

To delete an existing stream processor with mongosh, use the sp.<streamprocessor>.drop() method. <streamprocessor> must be the name of a stream processor defined for the current stream processing instance.

For example, to drop a stream processor named proc01, run the following command:

sp.proc01.drop()

This method returns:

  • true if the stream processor exists.

  • false if the stream processor doesn't exist.

When you drop a stream processor, all resources that Atlas Stream Processing provisioned for it are destroyed, along with all saved state.

To list all available stream processors:

The Atlas Administration API provides an endpoint for listing all available stream processors.

List Stream Processors

To list all available stream processors on the current stream processing instance with mongosh, use the sp.listStreamProcessors() method. It returns a list of documents containing the name, start time, current state, and pipeline associated with each stream processor. It has the following syntax:

sp.listStreamProcessors(<filter>)

<filter> is a document specifying which field(s) to filter the list by.

Example

The following example shows a return value for an unfiltered request:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

If you run the command again on the same stream processing instance, filtering for a "state" of "running", you see the following output:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

To return an array of sampled results from an existing stream processor to STDOUT with mongosh, use the sp.<streamprocessor>.sample() method. <streamprocessor> must be the name of a currently running stream processor defined for the current stream processing instance. For example, the following command samples from a stream processor named proc01.

sp.proc01.sample()

This command runs continuously until you cancel it by using CTRL-C, or until the returned samples cumulatively reach 40 MB in size. The stream processor reports invalid documents in the sample in a _dlqMessage document of the following form:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.

Note

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

To view statistics of a stream processor:

The Atlas Administration API provides an endpoint for viewing the statistics of a stream processor.

Get One Stream Processor

To return a document summarizing the current status of an existing stream processor with mongosh, use the sp.<streamprocessor>.stats() method. streamprocessor must be the name of a currently running stream processor defined for the current stream processing instance. It has the following syntax:

sp.<streamprocessor>.stats({options: {<options>}})

Where options is an optional document with the following fields:

Field
Type
Description

scale

integer

Unit to use for the size of items in the output. By default, Atlas Stream Processing displays item size in bytes. To display in KB, specify a scale of 1024.

verbose

boolean

Flag that specifies the verbosity level of the output document. If set to true, the output document contains a subdocument that reports the statistics of each individual operator in your pipeline. Defaults to false.

The output document has the following fields:

Field
Type
Description

ns

string

The namespace the stream processor is defined in.

stats

object

A document describing the operational state of the stream processor.

stats.name

string

The name of the stream processor.

stats.status

string

The status of the stream processor. This field can have the following values:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

integer

The scale in which the size field displays. If set to 1, sizes display in bytes. If set to 1024, sizes display in kilobytes.

stats.inputMessageCount

integer

The number of documents published to the stream. A document is considered 'published' to the stream once it passes through the $source stage, not when it passes through the entire pipeline.

stats.inputMessageSize

integer

The number of bytes or kilobytes published to the stream. Bytes are considered 'published' to the stream once they pass through the $source stage, not when it passes through the entire pipeline.

stats.outputMessageCount

integer

The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline.

stats.outputMessageSize

integer

The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline.

stats.dlqMessageCount

integer

The number of documents sent to the Dead Letter Queue.

stats.dlqMessageSize

integer

The number of bytes or kilobytes sent to the Dead Letter Queue.

stats.changeStreamTimeDifferenceSecs

integer

The difference, in seconds, between the event time represented by the most recent change stream resume token and the latest event in the oplog.

stats.changeStreamState

token

The most recent change stream resume token. Only applies to stream processors with a change stream source.

stats.stateSize

integer

The number of bytes used by windows to store processor state.

stats.watermark

integer

The timestamp of the current watermark.

stats.operatorStats

array

The statistics for each operator in the processor pipeline. Atlas Stream Processing returns this field only if you pass in the verbose option.

stats.operatorStats provides per-operator versions of many core stats fields:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

stats.operatorStats includes the following unique fields:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats also includes the following fields given that you have passed in the verbose option and your processor includes a window stage:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

integer

The maximum memory usage of the operator in bytes or kilobytes.

stats.operatorStats.executionTime

integer

The total execution time of the operator in seconds.

stats.minOpenWindowStartTime

date

The start time of the minimum open window. This value is optional.

stats.maxOpenWindowStartTime

date

The start time of the maximum open window. This value is optional.

stats.kafkaPartitions

array

Offset information for an Apache Kafka broker's partitions. kafkaPartitions applies only to connections using an Apache Kafka source.

stats.kafkaPartitions.partition

integer

The Apache Kafka topic partition number.

stats.kafkaPartitions.currentOffset

integer

The offset that the stream processor is on for the specified partition. This value equals the previous offset that the stream processor processed plus 1.

stats.kafkaPartitions.checkpointOffset

integer

The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint.

stats.kafkaPartitions.isIdle

boolean

The flag that indicates whether the partition is idle. This value defaults to false.

For example, the following shows the status of a stream processor named proc01 on a stream processing instance named inst01 with item sizes displayed in KB:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

Back

Manage Connections