$https
定義
$https
ステージは、 リクエストを送信するための HTTPS
接続レジストリで接続を指定します。前のステージがドキュメントを$https
に渡すたびに、ステージは新しいリクエストを送信します。
構文
特定の接続に HTTPS
リクエストを送信するには、次の手順に従います。
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer> } } }
$https
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 必須 | |
| string |式 | 任意 |
例、
呼び出すAPIエンドポイントは、冪等である必要があります。 |
| ドキュメント | 任意 | |
| string | 任意 | 接続の HTTPS リクエストメソッド。次のいずれかの値である必要があります。
デフォルトは |
| ドキュメント | 任意 | ヘッダーとしてAPIエンドポイントに渡すキーと値のペアを含むドキュメント。各キーは string で、各値は string として評価される必要があります。このフィールドは、値として 式 をサポートします。 APIエンドポイントにAPIキーや Bearer アクセス トークン認証などの認証が必要な場合は、接続を定義するときに認証の詳細をヘッダーとして追加して、この演算子の一部としてプレーンテキストとしてこれらを提供する必要があります。 無効なHTTPヘッダー名と値はAPIエンドポイントに送信されません。代わりに、これらは無視されます。 無効なHTTPヘッダーの詳細については、「 RFC9110 を参照してください。 値内の式が失敗したり、string 以外のタイプに評価された場合、メッセージはデッドレターキュー (DLQ)に送信され、 演算子はこのリクエストをAPIエンドポイントに送信しません。 |
| string | 必須 | REST API応答のフィールドの名前。 エンドポイントが 0 バイトを返す場合、演算子は 演算子は、 APIエンドポイントが |
| string | 任意 | 演算子が
演算子では、すべての
演算子では、その他のHTTPステータス コードはすべて
デフォルトは |
| 配列 | 任意 | APIエンドポイントに送信されるリクエスト本文をカスタマイズできるカスタム内部パイプライン。
デフォルトでは 、メッセージ全体がAPIエンドポイントに送信されます。 Atlas Stream Processing は、緩和モードのJSONペイロードをAPIエンドポイントに送信します。 無効なHTTPリクエスト本体はAPIエンドポイントに送信されません。代わりに、これらはデッドレターキュー (DLQ)に送信されます。 無効なHTTPリクエストボディの詳細については、 RFC9110 を参照してください。 |
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| integer | 任意 | 成功した デフォルトは |
| integer | 任意 | 接続できない場合に デフォルトは |
動作
$https
は$source
ステージの後に、$ $merge
ステージまたは ステージの前に配置する必要があります。 は$https
$hoppingWindow
または$tumblingWindow
内部パイプラインで使用できます。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、ingestionTime
に設定されます。Apache Kafkaプロバイダーからの各レコードに対して、
$https
ステージはhttps_weather
接続で定義された HTTPSposition.coordinates
気象ソースにリクエストを送信します。リクエストでは、HTTPSリクエストのレコードの を使用して、そのロケーションの 7 日間の高温度予測値を摂氏単位で収集し、airTemperatureForecast
フィールドのパイプラインドキュメントに追加します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。