$https
Nesta página
Definição
O $https
estágio especifica uma conexão no Registro de conexão para enviar HTTPS
solicitações. Cada vez que um estágio anterior passa um documento $https
para, o estágio envia uma nova solicitação.
Sintaxe
Para enviar uma solicitação de HTTPS
para uma determinada conexão:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer> } } }
O estágio $https
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
| string | Obrigatório | Etiqueta que identifica a conexão no Registro de Conexão |
| corda | expressão | Opcional | Caminho para anexar à URL para a qual seu Por exemplo, se você especificar um Se você definir O endpoint API que você chama deve ser idempotente. |
| documento | Opcional | Documento contendo pares de valores-chave para passar como parâmetros para sua chamada de ponto de conexão da API. Cada chave deve ser uma string e cada valor deve avaliar para um valor numérico, de string ou booleano. Este campo suporta expressões como valores. |
| string | Opcional | Método de solicitação HTTPS para sua conexão. Deve ser um dos seguintes valores:
Padrão é |
| documento | Opcional | Documento contendo pares de valores-chave para passar como cabeçalhos para o ponto de conexão da API. Cada chave deve ser uma string e cada valor deve avaliar para uma string. Este campo suporta expressões como valores. Se o ponto de extremidade da API exigir autenticação, como uma chave de API ou autenticação de Token de Acesso ao Portador, você deverá adicionar detalhes de autenticação como cabeçalhos ao definir a conexão para evitar fornecê-los como texto simples como parte desse operador. Nomes e valores de cabeçalho HTTP inválidos não são enviados para o ponto de conexão da API. Em vez disso, estes são ignorados. Para saber mais sobre cabeçalhos HTTP inválidos, consulte RFC.9110 Se uma expressão em um valor falhar ou for avaliada como um tipo diferente de string, a mensagem será enviada para a fila de mensagens não entregues (DLQ) e o operador não enviará essa solicitação para o ponto de extremidade da API. |
| string | Obrigatório | Nome do campo para a resposta da API REST. Se o endpoint retornar 0 bytes, o operador não define o campo O operador suporta respostas com um Se o endpoint da API retornar uma resposta sem um |
| string | Opcional | Comportamento quando o operador encontra uma falha relacionada ao
O operador considera todos os códigos de status HTTP
O operador considera qualquer outro código de status HTTP como um erro
Padrão é |
| array | Opcional | Pipeline interno personalizado que permite personalizar o corpo da solicitação enviado para o endpoint da API. O
Por padrão, a mensagem inteira é enviada para o endpoint da API. O Atlas Stream Processing envia uma carga útil JSON de modo relaxado para o endpoint da API. Corpos de solicitação HTTP inválidos não são enviados para o endpoint da API. Em vez disso, eles são enviados para a fila de mensagens não entregues (DLQ). Para saber mais sobre corpos de solicitação HTTP inválidos, consulte RFC.9110 |
| documento | Opcional | documento que contém campo que substituem vários valores padrão. |
| inteiro | Opcional | O tempo, em segundos, após o qual uma conexão Padrão é |
| inteiro | Opcional | O tempo, em segundos, após o qual uma solicitação Padrão é |
Comportamento
$https
deve vir após o estágio e deve $source
vir antes do estágio $emit $merge
ou. Você pode usar $https
em $hoppingWindow
ou pipelines $tumblingWindow
internos.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$source
estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.Para cada registro do intermediário Apache Kafka, o estágio envia uma solicitação para uma fonte meteorológica HTTPS definida
$https
nahttps_weather
conexão. A solicitação usa oposition.coordinates
do registro na solicitação HTTPS para reunir uma previsão de alta temperatura de sete dias em graus Celsius para esse local, que é adicionada ao documento de pipeline em umairTemperatureForecast
campo.O estágio
$merge
grava a saída na coleção do Atlas chamadastream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.stream
resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.