문서 메뉴
문서 홈
/
MongoDB 아틀라스
/

스트림 프로세서 관리

이 페이지의 내용

Atlas Stream Processing 스트림 프로세서는 고유하게 명명된 스트림 집계 파이프라인 의 로직을 스트리밍 데이터에 적용합니다. Atlas Stream Processing은 각 스트림 프로세서 정의를 영구 저장소에 저장하여 재사용할 수 있습니다. 지정된 스트림 프로세서는 해당 정의가 저장된 스트림 처리 인스턴스 에서만 사용할 수 있습니다. Atlas Stream Processing은 작업자당 최대 4 스트림 프로세서를 지원합니다. 이 제한을 초과하는 추가 프로세서의 경우 Atlas Stream Processing에서 새 리소스를 할당합니다.

스트림 프로세서를 생성하고 managed 다음이 필요합니다.

대부분의 스트림 프로세서 명령은 메서드 호출에서 관련 스트림 프로세서의 이름을 지정해야 합니다. 다음 섹션에서 설명하는 구문은 엄밀히 영숫자 이름을 가정합니다. 스트림 프로세서의 이름에 하이픈(-) 또는 마침표(.)와 같은 영숫자가 아닌 문자가 포함된 경우, 이름을 대괄호([])와 큰따옴표("")로 묶어야 합니다. 메서드 호출(예: sp.["special-name-stream"].stats().

sp.process() 메서드를 사용하여 대화형으로 스트림 프로세서를 만들 수 있습니다. 대화형으로 생성하는 스트림 프로세서는 다음과 같은 동작을 나타냅니다.

  • 출력 및 데드 레터 큐 문서를 shell에 쓰기

  • 생성 즉시 실행 시작

  • 10 분 동안 또는 사용자가 중지할 때까지 실행합니다.

  • 중지한 후 지속하지 않음

대화형으로 생성하는 스트림 프로세서는 프로토타입 제작을 위한 것입니다. 영구 스트림 프로세서를 만들려면 스트림 프로세서 만들기를 참조하세요.

sp.process() 의 구문은 다음과 같습니다:

sp.process(<pipeline>)
필드
유형
필요성
설명
pipeline
배열
필수 사항
스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 .
1

Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

예제

다음 명령어는 SCRAM-SHA-256 인증을 사용하여 streamOwner 이)라는 사용자로 Atlas Stream Processing 인스턴스에 연결합니다.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

메시지가 표시되면 사용자 비밀번호를 입력합니다.

2

mongosh 프롬프트에서 적용하려는 애그리게이션 단계가 포함된 배열을 pipeline 라는 변수에 할당합니다.

다음 예제에서는 연결 레지스트리에서 myKafka 연결의 stuff 주제를 $source 로 사용하여 temperature 필드의 값이 46 인 레코드와 일치시키고, 처리된 메시지를 output 로 내보냅니다. 연결 레지스트리의 mySink 연결 주제:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

다음 명령은 pipeline 에 정의된 로직을 적용하는 스트림 프로세서를 만듭니다.

sp.process(pipeline)

mongosh 을 사용하여 새 스트림 프로세서를 만들려면 sp.createStreamProcessor() 메서드를 사용합니다. 다음과 같은 구문을 가집니다.

sp.createStreamProcessor(<name>, <pipeline>, <options>)
논의
유형
필요성
설명
name
문자열
필수 사항
스트림 프로세서의 논리적 이름입니다. 이는 스트림 처리 인스턴스 내에서 고유해야 합니다. 이 이름에는 영숫자만 포함되어야 합니다.
pipeline
배열
필수 사항
스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 .
options
객체
옵션
스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다.
options.dlq
객체
조건부
Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 options 필드를 정의하는 경우 필요합니다.
options.dlq.connectionName
문자열
조건부
연결 레지스트리에서 연결을 식별하는 사람이 읽을 수 있는 레이블입니다. 이 연결은 Atlas cluster를 참고해야 합니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
options.dlq.db
문자열
조건부
options.dlq.connectionName에 지정된 cluster의 Atlas 데이터베이스 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
options.dlq.coll
문자열
조건부
options.dlq.db 에 지정된 데이터베이스의 collection 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
1

Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

예제

다음 명령어는 SCRAM-SHA-256 인증을 사용하여 streamOwner 이)라는 사용자로 Atlas Stream Processing 인스턴스에 연결합니다.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

메시지가 표시되면 사용자 비밀번호를 입력합니다.

2

mongosh 프롬프트에서 적용하려는 애그리게이션 단계가 포함된 배열을 pipeline 라는 변수에 할당합니다.

다음 예제에서는 연결 레지스트리에서 myKafka 연결의 stuff 주제를 $source 로 사용하여 temperature 필드의 값이 46 인 레코드와 일치시키고, 처리된 메시지를 output 로 내보냅니다. 연결 레지스트리의 mySink 연결 주제:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh 프롬프트에서 DLQ의 다음 속성이 포함된 객체를 할당합니다.

  • connectionName

  • 데이터베이스 이름

  • 컬렉션 이름

다음 예제에서는 metadata.dlq 데이터베이스 collection에서 cluster01 연결을 통한 DLQ를 정의합니다.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

다음 명령어는 pipeline 에 정의된 로직을 적용하는 proc01 라는 이름의 스트림 프로세서를 만듭니다. 처리 중 오류가 발생한 문서는 deadLetter 에 정의된 DLQ에 기록됩니다.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

mongosh 으로 기존 스트림 프로세서를 시작하려면 sp.<streamprocessor>.start() 메서드를 사용합니다. <streamprocessor> 는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01 이라는 스트림 프로세서를 시작하려면 다음 명령을 실행합니다.

sp.proc01.start()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하지만 현재 실행 중이 아닌 경우.

  • false 존재하지 않거나 존재하지만 현재 실행 중인 스트림 프로세서를 시작하려고 하는 경우

mongosh 으로 기존 스트림 프로세서를 중지하려면 sp.<streamprocessor>.stop() 메서드를 사용합니다. <streamprocessor> 는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01 이라는 스트림 프로세서를 중지하려면 다음 명령을 실행합니다.

sp.proc01.stop()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하고 현재 실행 중인 경우

  • false 스트림 프로세서가 존재하지 않거나 스트림 프로세서가 현재 실행 중이 아닌 경우.

mongosh 으로 기존 스트림 프로세서를 삭제하려면 sp.<streamprocessor>.drop() 메서드를 사용합니다. <streamprocessor> 는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01 이라는 스트림 프로세서를 삭제하려면 다음 명령을 실행합니다.

sp.proc01.drop()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하는 경우.

  • false 스트림 프로세서가 존재하지 않는 경우.

스트림 프로세서를 삭제하면 Atlas Stream Processing이 이에 대해 프로비저닝한 모든 리소스와 저장된 모든 상태가 함께 폐기됩니다.

mongosh 을 사용하여 현재 스트림 처리 인스턴스에서 사용 가능한 모든 스트림 프로세서를 나열하려면 sp.listStreamProcessors() 메서드를 사용합니다. 각 스트림 프로세서와 관련된 이름, 시작 시간, 현재 상태 및 파이프라인이 포함된 문서 목록을 반환합니다. 다음과 같은 구문을 가집니다.

sp.listStreamProcessors(<filter>)

<filter> 목록을 필터링할 필드를 지정하는 문서입니다.

예제

다음 예는 필터링되지 않은 요청의 반환 값을 보여줍니다.

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

동일한 Atlas Stream Processing 인스턴스에서 명령을 다시 실행하여 "running""state" 에 대해 필터링하면 다음 출력이 표시됩니다.

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

기존 스트림 프로세서에서 샘플링된 결과의 배열을 mongosh 사용하여 STDOUT 으)로 반환하려면 sp.<streamprocessor>.sample() 메서드를 사용합니다. <streamprocessor> 은(는) 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 예를 들어, 다음 명령은 proc01 이라는 스트림 프로세서에서 샘플링합니다.

sp.proc01.sample()

이 명령은 CTRL-C 을 사용하여 취소할 때까지 또는 반환된 샘플의 누적 크기가 40 MB에 도달할 때까지 계속 실행됩니다. 스트림 프로세서는 샘플의 유효하지 않은 문서를 다음 형식의 _dlqMessage 문서로 보고합니다.

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>'
}
}

이러한 메시지를 사용하면 데드 레터 큐 컬렉션을 정의하지 않고도 데이터 보안 문제를 진단할 수 있습니다.

mongosh 을 사용하여 기존 스트림 프로세서의 현재 상태를 요약하는 문서를 반환하려면 sp.<streamprocessor>.stats() 메서드를 사용합니다. streamprocessor 는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 다음과 같은 구문을 가집니다.

sp.<streamprocessor>.stats({options: {<options>}})

여기서 options 은 다음 필드가 있는 선택적 문서입니다.

필드
유형
설명
scale
integer
출력의 항목 크기에 사용할 단위입니다. 기본적으로 Atlas Stream Processing은 항목 크기를 바이트 단위로 표시합니다. KB 단위로 표시하려면 1024scale 을 지정합니다.
verbose
부울
출력 문서의 상세 수준을 지정하는 플래그입니다. true 로 설정하면 출력 문서에는 파이프라인의 각 개별 연산자에 대한 통계를 보고하는 하위 문서가 포함됩니다. 기본값은 false 입니다.

출력 문서에는 다음과 같은 필드가 있습니다.

필드
유형
설명
ns
문자열
스트림 프로세서가 정의된 네임스페이스입니다.
stats
객체
스트림 프로세서의 작동 상태를 설명하는 문서입니다.
stats.name
문자열
스트림 프로세서의 이름입니다.
stats.status
문자열

스트림 프로세서의 상태입니다. 이 필드는 다음과 같은 값을 가질 수 있습니다:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
integer
크기 필드가 표시되는 배율입니다. 1 로 설정하면 크기가 바이트 단위로 표시됩니다. 1024 로 설정하면 크기가 킬로바이트 단위로 표시됩니다.
stats.inputMessageCount
integer
스트림에 게시된 문서 수입니다. 문서가 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과하면 스트림에 '게시된' 것으로 간주됩니다.
stats.inputMessageSize
integer
스트림에 게시된 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과한 후에 스트림에 '게시된' 것으로 간주됩니다.
stats.outputMessageCount
integer
스트림에서 처리한 문서 수입니다. 문서가 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다.
stats.outputMessageSize
integer
스트림에서 처리한 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다.
stats.dlqMessageCount
integer
stats.dlqMessageSize
integer
stats.stateSize
integer
Windows에서 프로세서 상태를 저장하는 데 사용하는 바이트 수입니다.
stats.watermark
integer
현재 워터마크의 타임스탬프입니다.
stats.operatorStats
배열

프로세서 파이프라인의 각 연산자에 대한 통계입니다. Atlas Stream Processing은 verbose 옵션을 전달한 경우에만 이 필드를 반환합니다.

stats.operatorStats 다양한 핵심 stats 필드의 연산자별 버전을 제공합니다.

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

또한 stats.operatorStats 에는 다음과 같은 고유 필드가 포함되어 있습니다.

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
integer
연산자의 최대 메모리 사용량(바이트 또는 킬로바이트)입니다.
stats.operatorStats.executionTime
integer
연산자의 총 실행 시간(초)입니다.
stats.kafkaPartitions
배열
Apache Kafka 에 대한 오프셋 정보 브로커의 파티션. 는kafkaPartitions Apache Kafka 를 사용하는 연결에만 적용됩니다. 소스.
stats.kafkaPartitions.partition
integer
Apache Kafka 주제 파티션 번호입니다.
stats.kafkaPartitions.currentOffset
integer
지정된 파티션에 대해 스트림 프로세서가 있는 오프셋입니다. 이 값은 스트림 프로세서가 처리한 이전 오프셋에 1 을 더한 것과 같습니다.
stats.kafkaPartitions.checkpointOffset
integer
스트림 프로세서가 Apache Kafka 에 마지막으로 커밋한 오프셋입니다. 브로커와 지정된 파티션에 대한 체크포인트. 이 오프셋을 통한 모든 메시지는 마지막 체크포인트에 기록됩니다.

예를 들어 다음은 항목 크기가 KB 단위로 표시되는 inst01 Atlas Stream Processing 인스턴스에서 proc01 스트림 프로세서의 상태를 보여줍니다.

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}
← 인스턴스 구성