$https
定义
$https
阶段指定 连接注册表中要将 HTTPS
请求发送到的连接。每次前一阶段将文档传递给$https
时,该阶段都会发送一个新请求。
语法
要将 HTTPS
请求到给定连接,请执行以下操作:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer> } } }
$https
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | |
| 字符串 |表达式 | Optional | 要附加到 示例,如果您指定解析为 如果将 您调用的API端点应该是幂等的。 |
| 文档 | Optional | |
| 字符串 | Optional | 连接的 HTTPS请求方法。必须是以下值之一:
默认值为 |
| 文档 | Optional | 包含要作为标头传递到API端点的键值对的文档。每个键必须是字符串,每个值的计算结果也必须为字符串。该字段支持将表达式作为值。 如果API端点需要身份验证,例如API密钥或持有者访问令牌身份验证,则应在定义连接时将身份验证详细信息作为标头添加,以防止在此操作符中以明文形式提供这些信息。 无效的HTTP标头名称和值不会发送到API端点。相反,它们会被忽略。 要学习;了解有关无效HTTP 头部的详情,请参阅 RFC9110 。 如果值中的表达式失败或计算结果为字符串以外的类型,则该消息将发送到死信队列(DLQ),并且操作符不会将此请求发送到API端点。 |
| 字符串 | 必需 | REST API响应的字段名称。 如果端点返回 0 字节,则该操作符不会设立 该操作符支持 如果API端点返回的响应没有定义 |
| 字符串 | Optional | 操作符遇到与
该操作符将所有
该操作符将任何其他HTTP状态代码视为
默认值为 |
| 阵列 | Optional | 自定义内部管道,允许您自定义发送到API端点的请求正文。
默认,整个消息都会发送到API端点。 Atlas Stream Processing会向API端点发送宽松模式JSON有效负载。 无效的HTTP请求正文不会发送到API端点。相反,它们会被发送到死信队列(DLQ)。 要学习;了解有关无效HTTP请求正文的更多信息,请参阅 RFC9110 。 |
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 |
| 整型 | Optional | 成功的 默认值为 |
| 整型 | Optional | 如果无法连接,则 默认值为 |
行为
$https
必须位于$source
阶段之后,并且必须位于 $emit 或$merge
阶段之前。您可以在$https
$hoppingWindow
或$tumblingWindow
内部管道中使用 。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source
阶段与在名为my_weatherdata
的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖它投影的时间戳字段的名称,将其设置为ingestionTime
。对于来自Apache Kafka代理的每条记录,
$https
阶段会向 连接中定义的 HTTPShttps_weather
天气源发送一个请求。该请求使用position.coordinates
HTTPS请求中的记录中的 收集该位置7 天的高温预报(以摄氏度为单位),并将其添加到管道文档的airTemperatureForecast
字段。$merge
阶段将输出写入sample_weatherstream
数据库中名为stream
的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
要查看生成的 sample_weatherstream.stream
集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。