Docs Menu
Docs Home
/
MongoDB 매뉴얼
/ / /

sp.process()

이 페이지의 내용

  • 정의
  • 호환성
  • 구문
  • 명령 필드
  • 행동
  • 액세스 제어
  • 예시
  • 자세히 알아보기
sp.process()

버전 7.0의 새로운 기능: 현재스트림 처리 인스턴스 에 임시 스트림 프로세서 를 생성합니다.

이 메서드는 Atlas Stream Processing Instances에서 지원됩니다.

sp.process() 메서드의 구문은 다음과 같습니다.

sp.process(
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 는 다음 필드를 사용합니다.

필드
유형
필요성
설명

name

문자열

필수 사항

스트림 프로세서의 논리적 이름입니다. 이는 Atlas Stream Processing 인스턴스 내에서 고유해야 합니다.

pipeline

배열

필수 사항

스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 입니다.

options

객체

옵션

스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다.

options.dlq

객체

조건부

Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 options 필드를 정의하는 경우 필요합니다.

options.dlq.connectionName

문자열

조건부

연결 레지스트리에서 연결을 식별하는 레이블입니다. 이 연결은 Atlas 클러스터를 참조해야 합니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

options.dlq.db

문자열

조건부

options.dlq.connectionName에 지정된 cluster의 Atlas 데이터베이스 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

options.dlq.coll

문자열

조건부

options.dlq.db 에 지정된 데이터베이스의 collection 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

sp.process() 현재 Atlas Stream Processing 인스턴스에 이름이 없는 임시 스트림 프로세서를 생성하고 즉시 초기화합니다. 이 스트림 프로세서는 실행되는 동안에만 유지됩니다. 임시 스트림 프로세서를 종료한 경우 이를 사용하려면 다시 생성해야 합니다.

sp.process() 을(를) 실행하는 사용자에게는 atlasAdmin 역할이 있어야 합니다.

다음 예에서는 sample_stream_solar 연결에서 데이터를 수집하는 임시 스트림 프로세서를 만듭니다. 프로세서는 device_id 필드 값이 device_8 인 모든 문서를 제외하고, 나머지는 지속 시간이 10초인 텀블링 창 으로 전달합니다. 각 창은 수신한 문서를 그룹화한 다음 각 그룹에 대한 다양하고 유용한 통계를 반환합니다. 그런 다음 스트림 프로세서는 mongodb1 연결을 통해 이러한 레코드를 solar_db.solar_coll 에 병합합니다.

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: NumberInt(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

돌아가기

sp.listStreamProcessors