Docs Menu

$https

이 페이지의 내용

$https

$https 단계는 HTTPS 요청을 보낼 연결 레지스트리의 연결을 지정합니다. $https에 문서를 전달할 때마다 이전 단계는 새 요청을 보냅니다.

지정된 연결에 HTTPS 요청을 보내려면:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>
}
}
}

$https 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

HTTPS 요청을 보낼 연결 레지스트리의 연결을 식별하는 레이블입니다.

path

문자열 | 표현식

옵션

connectionName이 확인하는 URL에 추가할 경로입니다.

예를 들어, https://sample.com로 확인되는 connectionName을 지정하면, 스트림 프로세서가 https://sample.com/endpointHTTPS 요청을 보내도록 "endpoint"path를 지정할 수 있습니다.

path를 표현식으로 정의하면, 이 표현식은 문자열로 평가되어야 합니다.

호출하는 API 엔드포인트는 멱등적이어야 합니다.

parameters

문서

옵션

API 엔드포인트 호출에 매개변수로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며, 각 값은 숫자, 문자열 또는 불리언 값으로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다.

method

문자열

옵션

연결에 대한 HTTPS 요청 방법입니다. 다음 값 중 하나여야 합니다.

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

기본값은 "GET"입니다.

headers

문서

옵션

API 엔드포인트에 헤더로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며 각 값은 문자열로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다.

API 엔드포인트에 API 키 또는 Bearer Access Token 인증과 같은 인증이 필요한 경우, 연결을 정의할 때 인증 세부 정보를 헤더로 추가하여 이 연산자의 일부에 일반 텍스트로 제공되지 않도록 해야 합니다.

잘못된 HTTP 헤더 이름과 값은 API 엔드포인트로 전송되지 않습니다. 대신 이러한 항목은 무시됩니다.

To learn more about invalid HTTP headers, see RFC 9110.

값의 표현식이 실패하거나 문자열이 아닌 다른 유형으로 평가되면, 메시지는 데드 레터 큐로 전송되며 연산자는 이 요청을 API 엔드포인트로 보내지 않습니다.

as

문자열

필수 사항

REST API 응답의 필드 이름입니다.

엔드포인트가 0바이트를 반환하면, 연산자는 as 필드를 설정하지 않습니다.

연산자는 Content-Typeapplication/json 또는 text/plain인 응답을 지원합니다. API 엔드포인트가 다른 Content-Type을 가진 응답을 반환하면, 연산자는 사용자가 정의한 onError 동작에 따라 문서를 처리합니다.

API 엔드포인트가 Content-Type이 정의되지 않은 상태로 응답을 반환하면, 연산자는 그 응답이 application/json이라고 가정합니다.

onError

문자열

옵션

연산자에 HTTPS 관련 오류가 발생할 때의 동작입니다. 다음 값 중 하나여야 합니다.

  • "dlq" : Pass the affected document to the 데드 레터 큐.

  • "ignore" : 영향을 받는 문서에 대해 아무 작업도 수행하지 않습니다.

연산자는 모든 2XX HTTP 상태 코드를 성공으로 간주합니다. 연산자가 다음 HTTP 상태 코드 중 하나를 응답으로 받으면, 연산자는 이 필드에 제공한 값에 따라 행동을 결정합니다:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

연산자는 다른 HTTP 상태 코드를 "fail" 오류로 간주합니다. 예를 들어, API 엔드포인트가 500 HTTP 상태 코드를 반환하면 프로세서는 실패 상태에 들어가고 중지됩니다.

기본값은 "dlq"입니다.

payload

배열

옵션

API 엔드포인트로 전송되는 요청 본문을 사용자 지정할 수 있는 맞춤형 내부 파이프라인입니다. payload는 다음 표현식을 지원합니다.

  • $project

  • $addFields

  • $replaceRoot

  • $set

기본적으로 전체 메시지가 API 엔드포인트로 전송됩니다. Atlas Stream Processing은 API 엔드포인트로 완화 모드 JSON 페이로드를 보냅니다.

유효하지 않은 HTTP 요청 본문은 API 엔드포인트로 전송되지 않습니다. 대신, 데드 레터 큐로 전송됩니다.

잘못된 HTTP 요청 본문에 대해 더 알아보려면 RFC 9110을 참조하세요.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.connectionTimeoutSec

integer

옵션

응답을 받지 못할 경우 성공적인 HTTPS 연결의 제한 시간이 초과되기까지의 시간(초 단위)입니다.

기본값은 30입니다.

config.requestTimeoutSec

integer

옵션

연결되지 않을 경우 HTTPS 요청의 제한 시간이 초과되기까지의 시간(초 단위)입니다.

기본값은 60입니다.

$https$source 단계 뒤에 오고, $emit 또는 $merge 단계 앞에 와야 합니다. $https$hoppingWindow 또는 $tumblingWindow 내부 파이프라인에서 사용할 수 있습니다.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여 ingestionTime으로 설정합니다.

  2. Apache Kafka 브로커의 각 레코드에 대해 $https 단계는 https_weather 연결에 정의된 HTTPS 날씨 소스에 요청을 보냅니다. 요청은 HTTPS 요청의 레코드에서 position.coordinates를 사용하여 해당 위치에 대한 7일간의 최고 기온 예측(섭씨)을 수집하고, 이를 파이프라인 문서의 airTemperatureForecast 필드에 추가합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

이 페이지의 내용