Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$https

項目一覧

  • 定義
  • 構文
  • 動作
$https

$httpsステージは、 リクエストを送信するための HTTPS接続レジストリで接続を指定します。前のステージがドキュメントを$https に渡すたびに、ステージは新しいリクエストを送信します。

特定の接続に HTTPSリクエストを送信するには、次の手順に従います。

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>
}
}
}

$httpsステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

HTTPSリクエストの送信先となる接続レジストリ内の接続を識別するラベル。

path

string |式

任意

connectionName が解決するURLに追加するパス。

例、https://sample.com に解決する connectionName を指定した場合、ストリーム プロセッサが https://sample.com/endpointHTTPS リクエストを送信するには、"endpoint"path を指定します。

path を式として定義する場合、その式は string として評価される必要があります。

呼び出すAPIエンドポイントは、冪等である必要があります。

parameters

ドキュメント

任意

APIエンドポイント呼び出しにパラメータとして渡すキーと値のペアを含むドキュメント。各キーは string で、各値は数値、string、またはブール値値のいずれかに評価される必要があります。このフィールドは、値として 式 をサポートします。

method

string

任意

接続の HTTPS リクエストメソッド。次のいずれかの値である必要があります。

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

デフォルトは "GET" です。

headers

ドキュメント

任意

ヘッダーとしてAPIエンドポイントに渡すキーと値のペアを含むドキュメント。各キーは string で、各値は string として評価される必要があります。このフィールドは、値として 式 をサポートします。

APIエンドポイントにAPIキーや Bearer アクセス トークン認証などの認証が必要な場合は、接続を定義するときに認証の詳細をヘッダーとして追加して、この演算子の一部としてプレーンテキストとしてこれらを提供する必要があります。

無効なHTTPヘッダー名と値はAPIエンドポイントに送信されません。代わりに、これらは無視されます。

無効なHTTPヘッダーの詳細については、「 RFC9110 を参照してください。

値内の式が失敗したり、string 以外のタイプに評価された場合、メッセージはデッドレターキュー (DLQ)に送信され、 演算子はこのリクエストをAPIエンドポイントに送信しません。

as

string

必須

REST API応答のフィールドの名前。

エンドポイントが 0 バイトを返す場合、演算子は asフィールドを設定しません。

演算子は、application/json または text/plainContent-Type を持つ応答をサポートします。 APIエンドポイントが別の Content-Type を持つ応答を返す場合、演算子は定義した onError の動作に基づいてドキュメントを処理します。

APIエンドポイントが Content-Type を定義せずに応答を返す場合、 演算子は応答が application/json であると想定します。

onError

string

任意

演算子が HTTPS 関連の障害を発生させた場合の動作。次のいずれかの値である必要があります。

  • "dlq" : 影響を受けたドキュメントをデッドレターキュー (DLQ)に渡します。

  • "ignore" : 影響を受けるドキュメントは何も行いません。

  • "fail" : エラーが発生した場合、ストリーム プロセッサを終了します。

演算子では、すべての 2XX HTTPステータス コードが成功と見なされます。演算子が応答として次のHTTPステータス コードのいずれかを受け取った場合、演算子はこのフィールドに指定した値に基づいて動作に従います。

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

演算子では、その他のHTTPステータス コードはすべて "fail" エラーと見なされます。例、 APIエンドポイントが 500 HTTPステータス コードを返す場合、プロセッサは失敗状態になり、停止します。

onError は、無効な式など、$https 演算子自体の誤った構成に発生するエラーではトリガーしません。

デフォルトは "dlq" です。

payload

配列

任意

APIエンドポイントに送信されるリクエスト本文をカスタマイズできるカスタム内部パイプライン。 payload は次の式をサポートしています。

  • $project

  • $addFields

  • $replaceRoot

  • $set

デフォルトでは 、メッセージ全体がAPIエンドポイントに送信されます。 Atlas Stream Processing は、緩和モードのJSONペイロードをAPIエンドポイントに送信します。

無効なHTTPリクエスト本体はAPIエンドポイントに送信されません。代わりに、これらはデッドレターキュー (DLQ)に送信されます。

無効なHTTPリクエストボディの詳細については、 RFC9110 を参照してください。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.connectionTimeoutSec

integer

任意

成功した HTTPS 接続が応答を受け取らない場合にタイムアウトする時間(秒単位)。

デフォルトは 30 です。

config.requestTimeoutSec

integer

任意

接続できない場合に HTTPSリクエストがタイムアウトする時間(秒単位)。

デフォルトは 60 です。

$https$source ステージの後に、$ $mergeステージまたは ステージの前に配置する必要があります。 は$https $hoppingWindowまたは$tumblingWindow 内部パイプラインで使用できます。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、 ingestionTimeに設定されます。

  2. Apache Kafkaプロバイダーからの各レコードに対して、 $httpsステージはhttps_weather 接続で定義された HTTPSposition.coordinates 気象ソースにリクエストを送信します。リクエストでは、HTTPSリクエストのレコードの を使用して、そのロケーションの 7 日間の高温度予測値を摂氏単位で収集し、airTemperatureForecast フィールドのパイプラインドキュメントに追加します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$validate