Manage Stream Processors
On this page
- Prerequisites
- Considerations
- Create a Stream Processor Interactively
- Connect to your stream processing instance.
- Define a pipeline.
- Create a stream processor.
- Create a Stream Processor
- Start a Stream Processor
- Stop a Stream Processor
- Drop a Stream Processor
- List Available Stream Processors
- Sample from a Stream Processor
- View Statistics of a Stream Processor
An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing instance its definition is stored in. Atlas Stream Processing supports up to 4 stream processors per worker. For additional processors that exceed this limit, Atlas Stream Processing allocates a new resource.
Prerequisites
To create and manage a stream processor, you must have:
mongosh
version 2.0 or higherA database user with the
atlasAdmin
role to create and run stream processorsAn Atlas cluster
Considerations
Many stream processor commands require you to specify the name of the
relevant stream processor in the method invocation. The syntax
described in the following sections assumes strictly alphanumeric
names. If your stream processor's name includes non-alphanumeric
characters such as hyphens (-
) or full stops (.
), you must
enclose the name in square brackets ([]
) and double quotes
(""
) in the method invocation, as in
sp.["special-name-stream"].stats()
.
Create a Stream Processor Interactively
You can create a stream processor interactively with the
sp.process()
method. Stream processors that you create
interactively exhibit the following behavior:
Write output and dead letter queue documents to the shell
Begin running immediately upon creation
Run for either 10 minutes or until the user stops them
Don't persist after stopping
Stream processors that you create interactively are intended for prototyping. To create a persistent stream processor, see Create a Stream Processor.
sp.process()
has the following syntax:
sp.process(<pipeline>)
Field | Type | Necessity | Description |
---|---|---|---|
pipeline | array | Required | Stream aggregation pipeline you
want to apply to your streaming data. |
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance
to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named
streamOwner
using x.059
authentication:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the
aggregation stages you want to apply to a variable named
pipeline
.
The following example uses the stuff
topic in
the myKafka
connection in the connection registry as the
$source
, matches records where the temperature
field has a value of 46
and emits the processed messages to
the output
topic of the mySink
connection in
the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Create a Stream Processor
To create a stream processor:
The Atlas Administration API provides an endpoint for creating a stream processor.
To create a new stream processor with mongosh
, use the
sp.createStreamProcessor()
method. It has the following syntax:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Type | Necessity | Description |
---|---|---|---|
name | string | Required | Logical name for the stream processor. This must be unique
within the stream processing instance. This name should contain only alphanumeric
characters. |
pipeline | array | Required | Stream aggregation pipeline you
want to apply to your streaming data. |
options | object | Optional | Object defining various optional settings for your stream
processor. |
options.dlq | object | Conditional | Object assigning a
dead letter queue for your stream processing instance. This field is
necessary if you define the options field. |
options.dlq.connectionName | string | Conditional | Human-readable label that identifies a connection in your
connection registry. This connection must reference an
Atlas cluster. This field is necessary if you define the
options.dlq field. |
options.dlq.db | string | Conditional | Name of an Atlas database on the cluster specified
in options.dlq.connectionName . This field is necessary if
you define the options.dlq field. |
options.dlq.coll | string | Conditional | Name of a collection in the database specified in
options.dlq.db . This field is necessary if you
define the options.dlq field. |
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance
to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named
streamOwner
using x.059
authentication.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the
aggregation stages you want to apply to a variable named
pipeline
.
The following example uses the stuff
topic in
the myKafka
connection in the connection registry as the
$source
, matches records where the temperature
field has a value of 46
and emits the processed messages to
the output
topic of the mySink
connection in
the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Optional) Define a DLQ.
In the mongosh
prompt, assign an object containing the
following properties of your DLQ:
Connection name
Database name
Collection name
The following example defines a DLQ over the cluster01
connection, in the metadata.dlq
database collection.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Start a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To start a stream processor:
The Atlas Administration API provides an endpoint for starting a stream processor.
To start an existing stream processor with mongosh
, use the
sp.<streamprocessor>.start()
method. <streamprocessor>
must be
the name of a stream processor defined for the current stream processing instance.
For example, to start a stream processor named proc01
, run the
following command:
sp.proc01.start()
This method returns:
true
if the stream processor exists and isn't currently running.false
if you try to start a stream processor that doesn't exist, or exists and is currently running.
Stop a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To stop a stream processor:
The Atlas Administration API provides an endpoint for stopping a stream processor.
To stop an existing stream processor with mongosh
, use the
sp.<streamprocessor>.stop()
method. <streamprocessor>
must be
the name of a currently running stream processor defined for the
current stream processing instance.
For example, to stop a stream processor named proc01
, run the
following command:
sp.proc01.stop()
This method returns:
true
if the stream processor exists and is currently running.false
if the stream processor doesn't exist, or if the stream processor isn't currently running.
Drop a Stream Processor
To drop a stream processor:
The Atlas Administration API provides an endpoint for deleting a stream processor.
To delete an existing stream processor with mongosh
, use the
sp.<streamprocessor>.drop()
method. <streamprocessor>
must be
the name of a stream processor defined for the current stream processing instance.
For example, to drop a stream processor named proc01
, run the
following command:
sp.proc01.drop()
This method returns:
true
if the stream processor exists.false
if the stream processor doesn't exist.
When you drop a stream processor, all resources that Atlas Stream Processing provisioned for it are destroyed, along with all saved state.
List Available Stream Processors
To list all available stream processors:
The Atlas Administration API provides an endpoint for listing all available stream processors.
To list all available stream processors on the current stream processing instance with
mongosh
, use the sp.listStreamProcessors()
method. It returns
a list of documents containing the name, start time, current state, and
pipeline associated with each stream processor. It has the following
syntax:
sp.listStreamProcessors(<filter>)
<filter>
is a document specifying which field(s) to filter the list
by.
Example
The following example shows a return value for an unfiltered request:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
If you run the command again on the same stream processing instance, filtering for a
"state"
of "running"
, you see the following output:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Sample from a Stream Processor
To return an array of sampled results from an existing stream processor
to STDOUT
with mongosh
, use the
sp.<streamprocessor>.sample()
method. <streamprocessor>
must be
the name of a currently running stream processor defined for the
current stream processing instance. For example, the following command samples from a
stream processor named proc01
.
sp.proc01.sample()
This command runs continuously until you cancel it by using
CTRL-C
, or until the returned samples cumulatively reach 40 MB in
size. The stream processor reports invalid documents in the sample in
a _dlqMessage
document of the following form:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.
View Statistics of a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To view statistics of a stream processor:
The Atlas Administration API provides an endpoint for viewing the statistics of a stream processor.
To return a document summarizing the current status of an existing
stream processor with mongosh
, use the
sp.<streamprocessor>.stats()
method. streamprocessor
must be
the name of a currently running stream processor defined for the
current stream processing instance. It has the following syntax:
sp.<streamprocessor>.stats({options: {<options>}})
Where options
is an optional document with the following fields:
Field | Type | Description |
---|---|---|
scale | integer | Unit to use for the size of items in the output. By default,
Atlas Stream Processing displays item size in bytes. To display in KB,
specify a scale of 1024 . |
verbose | boolean | Flag that specifies the verbosity level of the output document.
If set to true , the output document contains a subdocument
that reports the statistics of each individual operator in your
pipeline. Defaults to false . |
The output document has the following fields:
Field | Type | Description |
---|---|---|
ns | string | The namespace the stream processor is defined in. |
stats | object | A document describing the operational state of the stream
processor. |
stats.name | string | The name of the stream processor. |
stats.status | string | The status of the stream processor. This field can have the following values:
|
stats.scaleFactor | integer | The scale in which the size field displays. If set to 1 ,
sizes display in bytes. If set to 1024 , sizes display in
kilobytes. |
stats.inputMessageCount | integer | The number of documents published to the stream. A document
is considered 'published' to the stream once it passes
through the $source stage, not when it passes
through the entire pipeline. |
stats.inputMessageSize | integer | The number of bytes or kilobytes published to the stream.
Bytes are considered 'published' to the stream once they pass
through the $source stage, not when it passes
through the entire pipeline. |
stats.outputMessageCount | integer | The number of documents processed by the stream. A document is
considered 'processed' by the stream once it passes through the
entire pipeline. |
stats.outputMessageSize | integer | The number of bytes or kilobytes processed by the stream. Bytes
are considered 'processed' by the stream once they pass through
the entire pipeline. |
stats.dlqMessageCount | integer | The number of documents sent to the Dead Letter Queue. |
stats.dlqMessageSize | integer | The number of bytes or kilobytes sent to the
Dead Letter Queue. |
stats.changeStreamTimeDifferenceSecs | integer | The difference, in seconds, between the event time represented by
the most recent change stream resume token and the latest
event in the oplog. |
stats.changeStreamState | token | The most recent change stream resume token. Only applies
to stream processors with a change stream source. |
stats.stateSize | integer | The number of bytes used by windows to store processor state. |
stats.watermark | integer | The timestamp of the current watermark. |
stats.operatorStats | array | The statistics for each operator in the processor pipeline.
Atlas Stream Processing returns this field only if you pass in the
Additionally,
|
stats.operatorStats.maxMemoryUsage | integer | The maximum memory usage of the operator in bytes or kilobytes. |
stats.operatorStats.executionTime | integer | The total execution time of the operator in seconds. |
stats.kafkaPartitions | array | Offset information for an Apache Kafka broker's partitions.
kafkaPartitions applies only to connections using an
Apache Kafka source. |
stats.kafkaPartitions.partition | integer | The Apache Kafka topic partition number. |
stats.kafkaPartitions.currentOffset | integer | The offset that the stream processor is on for the
specified partition. This value equals the previous offset
that the stream processor processed plus 1 . |
stats.kafkaPartitions.checkpointOffset | integer | The offset that the stream processor last committed to the
Apache Kafka broker and the checkpoint for the specified
partition. All messages through this offset are
recorded in the last checkpoint. |
For example, the following shows the status of a stream processor named
proc01
on a stream processing instance named inst01
with item sizes displayed in
KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }