ANNOUNCEMENT: Voyage AI joins MongoDB to power more accurate and trustworthy AI applications on Atlas.
Learn more
Docs Menu

sp.process()

sp.process()

Creates an ephemeral Stream Processor on the current Stream Processing Instance.

You can only invoke this command while connected to a stream processing instance.

This command requires mongosh version ≥ 2.0.

This method is supported in Atlas Stream Processing Instances.

sp.process() 메서드의 구문은 다음과 같습니다.

sp.process(
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 은 다음 필드를 사용합니다.

필드
유형
필요성
설명

name

문자열

필수 사항

Logical name for the stream processor. This must be unique within the stream processing instance.

pipeline

배열

필수 사항

Stream aggregation pipeline you want to apply to your streaming data.

options

객체

옵션

Object defining various optional settings for your stream processor.

options.dlq

객체

조건부

Object assigning a dead letter queue for your stream processing instance. This field is necessary if you define the options field.

options.dlq.connectionName

문자열

조건부

Label that identifies a connection in your connection registry. This connection must reference an Atlas cluster. This field is necessary if you define the options.dlq field.

options.dlq.db

문자열

조건부

Name of an Atlas database on the cluster specified in options.dlq.connectionName. This field is necessary if you define the options.dlq field.

options.dlq.coll

문자열

조건부

Name of a collection in the database specified in options.dlq.db. This field is necessary if you define the options.dlq field.

sp.process() creates an ephemeral, unnamed stream processor on the current stream processing instance and immediately initializes it. This stream processor only persists as long as it runs. If you terminate an ephemeral stream processor, you must create it again in order to use it.

The user running sp.process() must have the atlasAdmin role.

The following example creates an ephemeral stream processor which ingests data from the sample_stream_solar connection. The processor excludes all documents where the value of the device_id field is device_8, passing the rest to a tumbling window with a 10-second duration. Each window groups the documents it receives, then returns various useful statistics of each group. The stream processor then merges these records to solar_db.solar_coll over the mongodb1 connection.

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: NumberInt(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)