Definition
New in version 7.0: Creates an ephemeral Stream Processor on the current Stream Processing Workspace.
Compatibility
This method is supported in Atlas Stream Processing Workspaces.
Syntax
The sp.process() method has the following
syntax:
sp.process( [ <pipeline> ] )
Command Fields
sp.createStreamProcessor() takes these fields:
Field | Type | Necessity | Description |
|---|---|---|---|
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
Behavior
sp.process() creates an ephemeral, unnamed stream
processor on the current stream processing workspace and immediately
initializes it. This stream processor only persists as long as it
runs. If you terminate an ephemeral stream processor, you must create
it again in order to use it.
Access Control
The user running sp.process() must have the
atlasAdmin role.
Example
The following example creates an ephemeral stream processor
which ingests data from the sample_stream_solar connection. The
processor excludes all documents where the value of the device_id
field is device_8, passing the rest to a tumbling window with a 10-second
duration. Each window groups the documents it receives, then returns
various useful statistics of each group. The stream processor then
merges these records to solar_db.solar_coll over the mongodb1
connection.
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: Int32(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )