Menu Docs

$https

$https

O estágio $https especifica uma conexão no Registro de Conexão para enviar HTTPS solicitações. Cada vez que uma etapa anterior passa um documento para $https, a etapa envia uma nova solicitação.

Para enviar uma solicitação de HTTPS para uma conexão específica:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>
}
}
}

O estágio $https recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Rótulo que identifica a conexão no Registro de Conexão para enviar uma solicitação HTTPS.

path

corda | expressão

Opcional

Caminho a ser anexado ao URL ao qual seu connectionName se resolve.

Por exemplo, se você especificar um connectionName que se resolve em https://sample.com, você pode especificar um path de "endpoint" para que seu processador de fluxo envie solicitações HTTPS para https://sample.com/endpoint.

Se você definir path como uma expressão, essa expressão deve ser avaliada como uma string.

O ponto de extremidade da API que você chama deve ser idempotente.

parameters

documento

Opcional

Documento contendo pares de chave-valor para serem passados como parâmetros para sua chamada de ponto de extremidade da API. Cada chave deve ser uma string, e cada valor deve ser avaliado como um valor numérico, string ou booleano. Este campo suporta expressões como valores.

method

string

Opcional

Método de solicitação HTTPS para sua conexão. Deve ser um dos seguintes valores:

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

Padrão é "GET".

headers

documento

Opcional

Documento contendo pares chave-valor para serem passados como cabeçalhos para o ponto de extremidade da API. Cada chave deve ser uma string, e cada valor deve resultar em uma string. Este campo suporta expressões como valores.

Se o endpoint da API exigir autenticação, como uma chave de API ou autenticação por Token de Acesso Bearer, você deve adicionar os detalhes de autenticação como cabeçalhos ao definir a conexão para evitar fornecê-los como texto simples como parte deste operador.

Nomes e valores de cabeçalhos HTTP inválidos não são enviados ao ponto de extremidade da API. Em vez disso, estes são ignorados.

Para saber mais sobre cabeçalhos HTTP inválidos, consulte RFC.9110

Se uma expressão em um valor falhar ou for avaliada como um tipo diferente de string, a mensagem será enviada para a fila de mensagens não entregues (DLQ) e o operador não enviará essa solicitação para o ponto de extremidade da API.

as

string

Obrigatório

Nome do campo para a resposta da REST API.

Se o ponto de extremidade retornar 0 bytes, o operador não configurará o campo as.

O operador suporta respostas com um Content-Type de application/json ou text/plain. Se o ponto de extremidade da API retornar uma resposta com um Content-Type diferente, o operador processará o documento com base no comportamento onError que você definir.

Se o ponto de extremidade da API retornar uma resposta sem um Content-Type definido, o operador assume que a resposta é application/json.

onError

string

Opcional

Comportamento quando o operador encontra uma falha relacionada a HTTPS. Deve ser um dos seguintes valores:

  • "dlq" : Pass the affected document to the dead letter queue (DLQ).

  • "ignore" : Não faça nada com o documento afetado.

  • "fail" : encerra o processador de fluxo em caso de erro.

O operador considera todos os códigos de status HTTP 2XX como bem-sucedidos. Se o operador receber algum dos seguintes códigos de status HTTP em resposta, ele seguirá o comportamento com base no valor que você fornecer neste campo:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

O operador considera qualquer outro código de status HTTP como um erro "fail". Por exemplo, se o ponto de extremidade da API retornar um código de status HTTP 500, o processador entra em um estado de falha e para.

onError não aciona em erros resultantes da configuração incorreta do próprio operador $https, como expressões inválidas.

Padrão é "dlq".

payload

array

Opcional

Pipeline interno personalizado que permite personalizar o corpo da solicitação enviado ao ponto de extremidade da API. payload suporta as seguintes expressões:

  • $project

  • $addFields

  • $replaceRoot

  • $set

Por padrão, a mensagem completa é enviada para o ponto de extremidade da API. O Atlas Stream Processing envia um payload JSON em modo relaxado para o ponto de extremidade da API.

Corpos de solicitações HTTP inválidas não são enviados para o ponto de extremidade da API. Em vez disso, elas são enviadas para a fila de mensagens não entregues (DLQ).

Para saber mais sobre corpos de solicitação HTTP inválidos, consulte RFC 9110.

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.connectionTimeoutSec

inteiro

Opcional

O tempo, em segundos, após o qual uma conexão HTTPS bem-sucedida é encerrada se não receber resposta.

Padrão é 30.

config.requestTimeoutSec

inteiro

Opcional

O tempo, em segundos, após o qual uma solicitação HTTPS atinge o tempo limite se não conseguir se conectar.

Padrão é 60.

$https deve vir após o estágio $source e deve vir antes do estágio $emit ou do estágio $merge. Você pode usar $https em $hoppingWindow ou $tumblingWindow pipelines internos.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. Para cada registro do broker do Apache Kafka, o estágio $https envia uma solicitação para uma fonte meteorológica HTTPS definida na conexão https_weather. A solicitação usa o position.coordinates do registro na solicitação HTTPS para obter uma previsão de alta temperatura de sete dias em graus Celsius para essa localização, que adiciona ao documento do pipeline em um campo airTemperatureForecast.

  3. O estágio $merge grava a saída na coleção do Atlas chamada stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.