$https
정의
단계는 $https
요청을 보낼 연결 레지스트리의 HTTPS
연결을 지정합니다. 이전 단계에서 에 문서 를 전달할 $https
때마다 새 요청 을 보냅니다.
구문
지정된 연결에 HTTPS
요청 을 보내려면 다음을 수행합니다.
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer> } } }
$https
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 | 요청 을 보낼 연결 레지스트리의 연결을 식별하는 |
| 문자열 | 표현식 | 옵션 |
예를 예시
호출하는 API 엔드포인트는 멱등 있어야 합니다. |
| 문서 | 옵션 | API 엔드포인트 호출에 매개변수로 전달할 키-값 쌍이 포함된 문서입니다. 각 키는 문자열이어야 하며 각 값은 숫자, 문자열 또는 부울 값으로 평가되어야 합니다. 이 필드 는 표현식을 값으로 지원합니다. |
| 문자열 | 옵션 | 연결을 위한 HTTPS 요청 메서드입니다. 다음 값 중 하나여야 합니다.
기본값은 |
| 문서 | 옵션 | API 엔드포인트에 헤더로 전달할 키-값 쌍이 포함된 문서입니다. 각 키는 문자열이어야 하며 각 값은 문자열로 평가되어야 합니다. 이 필드 는 표현식을 값으로 지원합니다. API 키 또는 베어러 액세스 토큰 인증 과 같이 API 엔드포인트에 인증 이 필요한 경우, 연결을 정의할 때 인증 세부 정보를 헤더로 추가하여 이 연산자 의 일부로 일반 텍스트로 제공되지 않도록 해야 합니다. 잘못된 HTTP 헤더 이름과 값은 API 엔드포인트로 전송되지 않습니다. 대신 이러한 항목은 무시됩니다. 유효하지 않은 HTTP headers 에 학습 보려면 RFC 9110 를 참조하세요. 값의 표현식 이 실패하거나 문자열이 아닌 다른 유형으로 평가되면 메시지는 데드 레터 큐 전송되고 연산자 는 이 요청 을 API 엔드포인트로 보내지 않습니다. |
| 문자열 | 필수 사항 | REST API 응답에 대한 필드 의 이름입니다. 엔드포인트가 0 바이트를 반환하는 경우 연산자 는 연산자 는 API 엔드포인트가 정의된 |
| 문자열 | 옵션 | 연산자 에
연산자 는 모든
연산자 는 다른 모든 HTTP 상태 코드를
기본값은 |
| 배열 | 옵션 | API 엔드포인트로 전송된 요청 본문을 사용자 지정할 수 있는 사용자 지정 내부 파이프라인 입니다.
기본값 전체 메시지가 API 엔드포인트로 전송됩니다. Atlas Stream Processing 은 완화 모드 JSON 페이로드를 API 엔드포인트로 전송합니다. 유효하지 않은 HTTP 요청 본문은 API 엔드포인트로 전송되지 않습니다.대신 데드 레터 큐 로 전송됩니다. 유효하지 않은 HTTP 요청 본문에 학습 보려면 RFC 9110 를 참조하세요. |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| integer | 옵션 | 응답을 받지 못할 경우 성공적인 기본값은 |
| integer | 옵션 | 연결할 수 없는 경우 기본값은 |
행동
$https
은 단계 뒤에 와야 하며 $source
$emit 또는 단계 $merge
앞에 와야 합니다. 또는 내부 파이프라인에서 $https
$hoppingWindow
를 $tumblingWindow
사용할 수 있습니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.Apache Kafka 브로커의
$https
각 기록https_weather
에 대해 단계에서는 연결에 정의된 HTTPS 날씨 소스로 요청 을 보냅니다. 요청 은 HTTPS 요청 의 기록 에서 를 사용하여position.coordinates
해당 위치 의 7일 최고 기온 예보를 섭씨 단위로 수집하고, 이를 파이프라인 문서 의 필드 에airTemperatureForecast
추가합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.