Docs 菜单
Docs 主页
/
MongoDB Atlas
/ /

$https

在此页面上

  • 定义
  • 语法
  • 行为
  • 示例
$https

$https阶段指定 连接注册表中要将 HTTPS请求发送到的连接。每次前一阶段将文档传递给$https 时,该阶段都会发送一个新请求。

要将 HTTPS请求到给定连接,请执行以下操作:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>
}
}
}

$https 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识连接注册表中要将 HTTPS请求发送到的连接的标签。

path

字符串 |表达式

Optional

要附加到 connectionName 解析为的URL的路径。

示例,如果您指定解析为 https://sample.comconnectionName,则可以指定 "endpoint"path,让流处理器向 https://sample.com/endpoint 发送 HTTPS 请求。

如果将 path 定义为表达式,则该表达式的计算结果必须为字符串。

您调用的API端点应该是幂等的。

parameters

文档

Optional

包含要作为参数传递给API端点调用的键值对的文档。每个键必须是字符串,每个值的计算结果必须为数字、字符串或布尔值。该字段支持将表达式作为值。

method

字符串

Optional

连接的 HTTPS请求方法。必须是以下值之一:

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

默认值为 "GET"

headers

文档

Optional

包含要作为标头传递到API端点的键值对的文档。每个键必须是字符串,每个值的计算结果也必须为字符串。该字段支持将表达式作为值。

如果API端点需要身份验证,例如API密钥或持有者访问令牌身份验证,则应在定义连接时将身份验证详细信息作为标头添加,以防止在此操作符中以明文形式提供这些信息。

无效的HTTP标头名称和值不会发送到API端点。相反,它们会被忽略。

要学习;了解有关无效HTTP 头部的详情,请参阅 RFC9110 。

如果值中的表达式失败或计算结果为字符串以外的类型,则该消息将发送到死信队列(DLQ),并且操作符不会将此请求发送到API端点。

as

字符串

必需

REST API响应的字段名称。

如果端点返回 0 字节,则该操作符不会设立as字段。

该操作符支持 Content-Typeapplication/jsontext/plain 的响应。如果API端点返回具有不同 Content-Type 的响应,则操作符将根据您定义的 onError 行为处理文档。

如果API端点返回的响应没有定义 Content-Type,则操作符假定该响应为 application/json

onError

字符串

Optional

操作符遇到与 HTTPS 相关的故障时的行为。必须是以下值之一:

该操作符将所有 2XX HTTP状态代码均视为成功。如果操作符收到以下任何HTTP状态代码作为响应,则操作符会根据您在此字段中提供的值遵循以下行为:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

该操作符将任何其他HTTP状态代码视为 "fail" 错误。示例,如果API端点返回 500 HTTP状态代码,则处理器进入失败状态并停止。

onError 由于 $https操作符本身配置不正确而引起的错误(例如无效表达式)不会触发。

默认值为 "dlq"

payload

阵列

Optional

自定义内部管道,允许您自定义发送到API端点的请求正文。 payload 支持以下表达式:

  • $project

  • $addFields

  • $replaceRoot

  • $set

默认,整个消息都会发送到API端点。 Atlas Stream Processing会向API端点发送宽松模式JSON有效负载。

无效的HTTP请求正文不会发送到API端点。相反,它们会被发送到死信队列(DLQ)。

要学习;了解有关无效HTTP请求正文的更多信息,请参阅 RFC9110 。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.connectionTimeoutSec

整型

Optional

成功的 HTTPS 连接在未收到响应的情况下超时的时间(以秒为单位)。

默认值为 30

config.requestTimeoutSec

整型

Optional

如果无法连接,则 HTTPS请求将超时的时间(以秒为单位)。

默认值为 60

$https必须位于$source 阶段之后,并且必须位于 $emit$merge 阶段之前。您可以在$https $hoppingWindow$tumblingWindow 内部管道中使用 。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与在名为 my_weatherdata 的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖它投影的时间戳字段的名称,将其设置为 ingestionTime

  2. 对于来自Apache Kafka代理的每条记录, $https阶段会向 连接中定义的 HTTPShttps_weather 天气源发送一个请求。该请求使用position.coordinates HTTPS请求中的记录中的 收集该位置7 天的高温预报(以摄氏度为单位),并将其添加到管道文档的airTemperatureForecast 字段。

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$validate