sp.process()
정의
sp.process()
Creates an ephemeral Stream Processor on the current Stream Processing Instance.
You can only invoke this command while connected to a stream processing instance.
This command requires
mongosh
version ≥ 2.0.
호환성
This method is supported in Atlas Stream Processing Instances.
구문
sp.process()
메서드의 구문은 다음과 같습니다.
sp.process( [ <pipeline> ], { <options> } )
명령 필드
sp.createStreamProcessor()
은 다음 필드를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 | Logical name for the stream processor. This must be unique within the stream processing instance. |
| 배열 | 필수 사항 | Stream aggregation pipeline you want to apply to your streaming data. |
| 객체 | 옵션 | Object defining various optional settings for your stream processor. |
| 객체 | 조건부 | Object assigning a
dead letter queue for your stream processing instance.
This field is necessary if you define the |
| 문자열 | 조건부 | Label that identifies a connection in your
connection registry. This connection must reference an
Atlas cluster. This field is necessary if you define the
|
| 문자열 | 조건부 | Name of an Atlas database on the cluster specified
in |
| 문자열 | 조건부 | Name of a collection in the database specified in
|
행동
sp.process()
creates an ephemeral, unnamed stream
processor on the current stream processing instance and immediately
initializes it. This stream processor only persists as long as it
runs. If you terminate an ephemeral stream processor, you must create
it again in order to use it.
액세스 제어
The user running sp.process()
must have the
atlasAdmin
role.
예시
The following example creates an ephemeral stream processor
which ingests data from the sample_stream_solar
connection. The
processor excludes all documents where the value of the device_id
field is device_8
, passing the rest to a tumbling window with a 10-second
duration. Each window groups the documents it receives, then returns
various useful statistics of each group. The stream processor then
merges these records to solar_db.solar_coll
over the mongodb1
connection.
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )