sp.createStreamProcessor()
Definition
sp.createStreamProcessor()
Creates a Stream Processor on the current Stream Processing Instance.
You can only invoke this command while connected to a stream processing instance.
This command requires
mongosh
version ≥ 2.0.
Compatibility
This method is supported in Atlas Stream Processing Instances.
Syntax
The sp.createStreamProcessor()
method has the following
syntax:
sp.createStreamProcessor( <name>, [ <pipeline> ], { <options> } )
Command Fields
sp.createStreamProcessor()
takes these fields:
Field | Type | Necessity | Description |
---|---|---|---|
name | string | Required | Logical name for the stream processor. This must be unique
within the stream processing instance. |
pipeline | array | Required | Stream aggregation pipeline you
want to apply to your streaming data. |
options | object | Optional | Object defining various optional settings for your stream
processor. |
options.dlq | object | Conditional | Object assigning a
dead letter queue for your stream processing instance.
This field is necessary if you define the options field. |
options.dlq.connectionName | string | Conditional | Label that identifies a connection in your
connection registry. This connection must reference an
Atlas cluster. This field is necessary if you define the
options.dlq field. |
options.dlq.db | string | Conditional | Name of an Atlas database on the cluster specified
in options.dlq.connectionName . This field is necessary if
you define the options.dlq field. |
options.dlq.coll | string | Conditional | Name of a collection in the database specified in
options.dlq.db . This field is necessary if you
define the options.dlq field. |
Behavior
sp.createStreamProcessor()
creates a persistent, named stream
processor on the current stream processing instance. You can
initialize this stream processor with
sp.processor.start()
. If you try to create a stream
processor with the same name as an existing stream processor,
mongosh
will return an error.
Access Control
The user running sp.createStreamProcessor()
must have the
atlasAdmin
role.
Example
The following example creates a stream processor named solarDemo
which ingests data from the sample_stream_solar
connection. The
processor excludes all documents where the value of the device_id
field is device_8
, passing the rest to a tumbling window with a 10-second
duration. Each window groups the documents it receives, then returns
various useful statistics of each group. The stream processor then
merges these records to solar_db.solar_coll
over the mongodb1
connection.
sp.createStreamProcessor( 'solarDemo', [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )