SP.createStreamProcessor()
定義
バージョン7.0の新機能: 現在の ストリーム プロセシング インスタンス に Stream プロセッサ を作成します。
構文
sp.createStreamProcessor()
メソッドの構文は次のとおりです。
sp.createStreamProcessor( <name>, [ <pipeline> ], { <options> } )
コマンドフィールド
sp.createStreamProcessor()
次のフィールドを取ります。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
name | string | 必須 | ストリーム プロセッサの論理名。 これは、ストリーム プロセシング インスタンス内で一意である必要があります。 |
pipeline | 配列 | 必須 | データのストリーミング配信に適用するストリーム集約パイプライン。 |
options | オブジェクト | 任意 | ストリーム プロセッサのさまざまなオプション設定を定義するオブジェクト。 |
options.dlq | オブジェクト | 条件付き | ストリーム プロセシング インスタンスにデッド レター キューを割り当てるオブジェクト。 このフィールドは、 options フィールドを定義する場合に必要です。 |
options.dlq.connectionName | string | 条件付き | 接続レジストリ内の接続を識別するラベル。 この接続は Atlas クラスターを参照する必要があります。 このフィールドは、 options.dlq フィールドを定義する場合に必要です。 |
options.dlq.db | string | 条件付き | options.dlq.connectionName で指定されたクラスター上の Atlas データベースの名前。 このフィールドは、 options.dlq フィールドを定義する場合に必要です。 |
options.dlq.coll | string | 条件付き | options.dlq.db で指定されるデータベース内のコレクションの名前。 このフィールドは、 options.dlq フィールドを定義する場合に必要です。 |
動作
sp.createStreamProcessor()
は、現在のストリーム プロセシング インスタンスに永続的な名前付きストリーム プロセッサを作成します。 このストリーム プロセッサは sp.processor.start()
で初期化できます。 既存のストリーム プロセッサと同じ名前でストリーム プロセッサを作成しようとすると、 mongosh
はエラーを返します。
アクセス制御
sp.createStreamProcessor()
を実行しているユーザーにはatlasAdmin
ロールが必要です。
例
次の例では、 sample_stream_solar
接続からデータを取り込むsolarDemo
という名前のストリーム プロセッサを作成します。 プロセッサは、 device_id
フィールドの値がdevice_8
であるすべてのドキュメントを除外し、残りを10秒の期間のローリングウィンドウに渡します。 各ウィンドウは受け取ったドキュメントをグループ化し、各グループのさまざまな有用な統計を返します。 次に、ストリーム プロセッサはこれらのレコードをmongodb1
接続を介してsolar_db.solar_coll
にマージします。
sp.createStreamProcessor( 'solarDemo', [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )