$https
정의
$https
단계는 HTTPS
요청을 보낼 연결 레지스트리의 연결을 지정합니다.
$https
에 문서를 전달할 때마다 이전 단계는 새 요청을 보냅니다.
구문
지정된 연결에 HTTPS
요청을 보내려면:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer> } } }
$https
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 |
|
| 문자열 | 표현식 | 옵션 |
예를 들어,
호출하는 API 엔드포인트는 멱등적이어야 합니다. |
| 문서 | 옵션 | API 엔드포인트 호출에 매개변수로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며, 각 값은 숫자, 문자열 또는 불리언 값으로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다. |
| 문자열 | 옵션 | 연결에 대한 HTTPS 요청 방법입니다. 다음 값 중 하나여야 합니다.
기본값은 |
| 문서 | 옵션 | API 엔드포인트에 헤더로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며 각 값은 문자열로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다. API 엔드포인트에 API 키 또는 Bearer Access Token 인증과 같은 인증이 필요한 경우, 연결을 정의할 때 인증 세부 정보를 헤더로 추가하여 이 연산자의 일부에 일반 텍스트로 제공되지 않도록 해야 합니다. 잘못된 HTTP 헤더 이름과 값은 API 엔드포인트로 전송되지 않습니다. 대신 이러한 항목은 무시됩니다. To learn more about invalid HTTP headers, see RFC 9110. 값의 표현식이 실패하거나 문자열이 아닌 다른 유형으로 평가되면, 메시지는 데드 레터 큐로 전송되며 연산자는 이 요청을 API 엔드포인트로 보내지 않습니다. |
| 문자열 | 필수 사항 | REST API 응답의 필드 이름입니다. 엔드포인트가 0바이트를 반환하면, 연산자는 연산자는 API 엔드포인트가 |
| 문자열 | 옵션 | 연산자에
연산자는 모든
연산자는 다른 HTTP 상태 코드를 기본값은 |
| 배열 | 옵션 | API 엔드포인트로 전송되는 요청 본문을 사용자 지정할 수 있는 맞춤형 내부 파이프라인입니다.
기본적으로 전체 메시지가 API 엔드포인트로 전송됩니다. Atlas Stream Processing은 API 엔드포인트로 완화 모드 JSON 페이로드를 보냅니다. 유효하지 않은 HTTP 요청 본문은 API 엔드포인트로 전송되지 않습니다. 대신, 데드 레터 큐로 전송됩니다. 잘못된 HTTP 요청 본문에 대해 더 알아보려면 RFC 9110을 참조하세요. |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| integer | 옵션 | 응답을 받지 못할 경우 성공적인 기본값은 |
| integer | 옵션 | 연결되지 않을 경우 기본값은 |
행동
$https
는 $source
단계 뒤에 오고, $emit 또는 $merge
단계 앞에 와야 합니다. $https
를 $hoppingWindow
또는 $tumblingWindow
내부 파이프라인에서 사용할 수 있습니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.Apache Kafka 브로커의 각 레코드에 대해
$https
단계는https_weather
연결에 정의된 HTTPS 날씨 소스에 요청을 보냅니다. 요청은 HTTPS 요청의 레코드에서position.coordinates
를 사용하여 해당 위치에 대한 7일간의 최고 기온 예측(섭씨)을 수집하고, 이를 파이프라인 문서의airTemperatureForecast
필드에 추가합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.