Docs Menu
Docs Home
/
MongoDB マニュアル
/ / /

SP.process()

項目一覧

  • 定義
  • 構文
  • コマンドフィールド
  • 動作
  • アクセス制御
  • 詳細
sp.process()

バージョン の新機能:7.0 現在の ストリーム プロセシング インスタンス にエフェメラル Stream プロセッサ を作成します。

sp.process()メソッドの構文は次のとおりです。

sp.process(
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 次のフィールドを取ります。

フィールド
タイプ
必要性
説明
name
string
必須
ストリーム プロセッサの論理名。 これは、ストリーム プロセシング インスタンス内で一意である必要があります。
pipeline
配列
必須
データのストリーミング配信に適用するストリーム集約パイプライン
options
オブジェクト
任意
ストリーム プロセッサのさまざまなオプション設定を定義するオブジェクト。
options.dlq
オブジェクト
条件付き
ストリーム プロセシング インスタンスにデッド レター キューを割り当てるオブジェクト。 このフィールドは、 optionsフィールドを定義する場合に必要です。
options.dlq.connectionName
string
条件付き
接続レジストリ内の接続を識別するラベル。 この接続は Atlas クラスターを参照する必要があります。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。
options.dlq.db
string
条件付き
options.dlq.connectionNameで指定されたクラスター上の Atlas データベースの名前。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。
options.dlq.coll
string
条件付き
options.dlq.dbで指定されるデータベース内のコレクションの名前。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。

sp.process() は、現在のストリーム プロセシング インスタンスにエフェメラルの名前のないストリーム プロセッサを作成し、すぐに初期化します。 このストリーム プロセッサは の実行中のみ保持されます。 エフェメラル ストリーム プロセッサを終了した場合、使用するには再度作成する必要があります。

sp.process()を実行しているユーザーにはatlasAdminロールが必要です。

次の例では、 sample_stream_solar接続からデータを取り込む一時的なストリーム プロセッサを作成しています。 プロセッサは、 device_idフィールドの値がdevice_8であるすべてのドキュメントを除外し、残りを10秒の期間のローリングウィンドウに渡します。 各ウィンドウは受け取ったドキュメントをグループ化し、各グループのさまざまな有用な統計を返します。 次に、ストリーム プロセッサはこれらのレコードをmongodb1接続を介してsolar_db.solar_collにマージします。

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: NumberInt(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

戻る

SP.listStreamProcessers