Docs Menu
Docs Home
/
MongoDB 매뉴얼
/ / /

sp.createStreamProcessor()

이 페이지의 내용

  • 정의
  • 호환성
  • 구문
  • 명령 필드
  • 행동
  • 액세스 제어
  • 예시
  • 자세히 알아보기
sp.createStreamProcessor()

버전 7.0의 새로운 기능: 현재스트림 처리 인스턴스 에 스트림 프로세서 를 생성합니다.

이 메서드는 Atlas Stream Processing Instances에서 지원됩니다.

sp.createStreamProcessor() 메서드의 구문은 다음과 같습니다.

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 는 다음 필드를 사용합니다.

필드
유형
필요성
설명
name
문자열
필수 사항
스트림 프로세서의 논리적 이름입니다. 이는 Atlas Stream Processing 인스턴스 내에서 고유해야 합니다.
pipeline
배열
필수 사항
스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 입니다.
options
객체
옵션
스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다.
options.dlq
객체
조건부
Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 options 필드를 정의하는 경우 필요합니다.
options.dlq.connectionName
문자열
조건부
연결 레지스트리에서 연결을 식별하는 레이블입니다. 이 연결은 Atlas 클러스터를 참조해야 합니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
options.dlq.db
문자열
조건부
options.dlq.connectionName에 지정된 cluster의 Atlas 데이터베이스 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
options.dlq.coll
문자열
조건부
options.dlq.db 에 지정된 데이터베이스의 collection 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

sp.createStreamProcessor() 현재 스트림 처리 인스턴스 에 영구 명명된 스트림 프로세서를 생성합니다. 이 스트림 프로세서를 sp.processor.start())로 초기화할 수 있습니다. 기존 스트림 프로세서와 동일한 이름으로 스트림 프로세서를 만들려고 하면 mongosh 에서 오류를 반환합니다.

sp.createStreamProcessor() 을(를) 실행하는 사용자에게는 atlasAdmin 역할이 있어야 합니다.

다음 예에서는 sample_stream_solar 연결에서 데이터를 수집하는 solarDemo 이라는 이름의 스트림 프로세서를 만듭니다. 프로세서는 device_id 필드 값이 device_8 인 모든 문서를 제외하고, 나머지는 지속 시간이 10초인 텀블링 창 으로 전달합니다. 각 창은 수신한 문서를 그룹화한 다음 각 그룹에 대한 다양하고 유용한 통계를 반환합니다. 그런 다음 스트림 프로세서는 mongodb1 연결을 통해 이러한 레코드를 solar_db.solar_coll 에 병합합니다.

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: NumberInt(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

돌아가기

Atlas Stream Processing