sp.process()
정의
버전 7.0의 새로운 기능: 현재스트림 처리 인스턴스 에 임시 스트림 프로세서 를 생성합니다.
호환성
구문
sp.process()
메서드의 구문은 다음과 같습니다.
sp.process( [ <pipeline> ], { <options> } )
명령 필드
sp.createStreamProcessor()
는 다음 필드를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 | 스트림 프로세서의 논리적 이름입니다. 이는 Atlas Stream Processing 인스턴스 내에서 고유해야 합니다. |
| 배열 | 필수 사항 | 스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 입니다. |
| 객체 | 옵션 | 스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다. |
| 객체 | 조건부 | Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 |
| 문자열 | 조건부 | 연결 레지스트리에서 연결을 식별하는 레이블입니다. 이 연결은 Atlas 클러스터를 참조해야 합니다. 이 필드는 |
| 문자열 | 조건부 |
|
| 문자열 | 조건부 |
|
행동
sp.process()
현재 Atlas Stream Processing 인스턴스에 이름이 없는 임시 스트림 프로세서를 생성하고 즉시 초기화합니다. 이 스트림 프로세서는 실행되는 동안에만 유지됩니다. 임시 스트림 프로세서를 종료한 경우 이를 사용하려면 다시 생성해야 합니다.
액세스 제어
sp.process()
을(를) 실행하는 사용자에게는 atlasAdmin
역할이 있어야 합니다.
예시
다음 예에서는 sample_stream_solar
연결에서 데이터를 수집하는 임시 스트림 프로세서를 만듭니다. 프로세서는 device_id
필드 값이 device_8
인 모든 문서를 제외하고, 나머지는 지속 시간이 10초인 텀블링 창 으로 전달합니다. 각 창은 수신한 문서를 그룹화한 다음 각 그룹에 대한 다양하고 유용한 통계를 반환합니다. 그런 다음 스트림 프로세서는 mongodb1
연결을 통해 이러한 레코드를 solar_db.solar_coll
에 병합합니다.
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )