스트림 프로세서 관리
이 페이지의 내용
Atlas Stream Processing 스트림 프로세서는 고유하게 명명된 스트림 집계 파이프라인 의 로직을 스트리밍 데이터에 적용합니다. Atlas Stream Processing은 각 스트림 프로세서 정의를 영구 저장소에 저장하여 재사용할 수 있습니다. 지정된 스트림 프로세서는 해당 정의가 저장된 스트림 처리 인스턴스 에서만 사용할 수 있습니다. Atlas Stream Processing은 작업자당 최대 4 스트림 프로세서를 지원합니다. 이 제한을 초과하는 추가 프로세서의 경우 Atlas Stream Processing에서 새 리소스를 할당합니다.
전제 조건
스트림 프로세서를 생성하고 managed 다음이 필요합니다.
mongosh
버전 2.0 이상스트림 프로세서를 생성하고 실행할 수 있는
atlasAdmin
역할이 있는 데이터베이스 사용자Atlas 클러스터
고려 사항
대부분의 스트림 프로세서 명령은 메서드 호출에서 관련 스트림 프로세서의 이름을 지정해야 합니다. 다음 섹션에서 설명하는 구문은 엄밀히 영숫자 이름을 가정합니다. 스트림 프로세서의 이름에 하이픈(-
) 또는 마침표(.
)와 같은 영숫자가 아닌 문자가 포함된 경우, 이름을 대괄호([]
)와 큰따옴표(""
)로 묶어야 합니다. 메서드 호출(예: sp.["special-name-stream"].stats()
.
대화형으로 스트림 프로세서 만들기
sp.process()
메서드를 사용하여 대화형으로 스트림 프로세서를 만들 수 있습니다. 대화형으로 생성하는 스트림 프로세서는 다음과 같은 동작을 나타냅니다.
출력 및 데드 레터 큐 문서를 shell에 쓰기
생성 즉시 실행 시작
10 분 동안 또는 사용자가 중지할 때까지 실행합니다.
중지한 후 지속하지 않음
대화형으로 생성하는 스트림 프로세서는 프로토타입 제작을 위한 것입니다. 영구 스트림 프로세서를 만들려면 스트림 프로세서 만들기를 참조하세요.
sp.process()
의 구문은 다음과 같습니다:
sp.process(<pipeline>)
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
pipeline | 배열 | 필수 사항 | 스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 . |
Atlas Stream Processing 인스턴스에 연결합니다.
Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh
을(를) 사용하여 연결합니다.
예제
다음 명령어는 SCRAM-SHA-256 인증을 사용하여 streamOwner
이)라는 사용자로 Atlas Stream Processing 인스턴스에 연결합니다.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
메시지가 표시되면 사용자 비밀번호를 입력합니다.
파이프라인을 정의합니다.
mongosh
프롬프트에서 적용하려는 애그리게이션 단계가 포함된 배열을 pipeline
라는 변수에 할당합니다.
다음 예제에서는 연결 레지스트리에서 myKafka
연결의 stuff
주제를 $source
로 사용하여 temperature
필드의 값이 46
인 레코드와 일치시키고, 처리된 메시지를 output
로 내보냅니다. 연결 레지스트리의 mySink
연결 주제:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
스트림 프로세서 생성
mongosh
을 사용하여 새 스트림 프로세서를 만들려면 sp.createStreamProcessor()
메서드를 사용합니다. 다음과 같은 구문을 가집니다.
sp.createStreamProcessor(<name>, <pipeline>, <options>)
논의 | 유형 | 필요성 | 설명 |
---|---|---|---|
name | 문자열 | 필수 사항 | 스트림 프로세서의 논리적 이름입니다. 이는 스트림 처리 인스턴스 내에서 고유해야 합니다. 이 이름에는 영숫자만 포함되어야 합니다. |
pipeline | 배열 | 필수 사항 | 스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 . |
options | 객체 | 옵션 | 스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다. |
options.dlq | 객체 | 조건부 | Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 options 필드를 정의하는 경우 필요합니다. |
options.dlq.connectionName | 문자열 | 조건부 | 연결 레지스트리에서 연결을 식별하는 사람이 읽을 수 있는 레이블입니다. 이 연결은 Atlas cluster를 참고해야 합니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다. |
options.dlq.db | 문자열 | 조건부 | options.dlq.connectionName 에 지정된 cluster의 Atlas 데이터베이스 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다. |
options.dlq.coll | 문자열 | 조건부 | options.dlq.db 에 지정된 데이터베이스의 collection 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다. |
Atlas Stream Processing 인스턴스에 연결합니다.
Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh
을(를) 사용하여 연결합니다.
예제
다음 명령어는 SCRAM-SHA-256 인증을 사용하여 streamOwner
이)라는 사용자로 Atlas Stream Processing 인스턴스에 연결합니다.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
메시지가 표시되면 사용자 비밀번호를 입력합니다.
파이프라인을 정의합니다.
mongosh
프롬프트에서 적용하려는 애그리게이션 단계가 포함된 배열을 pipeline
라는 변수에 할당합니다.
다음 예제에서는 연결 레지스트리에서 myKafka
연결의 stuff
주제를 $source
로 사용하여 temperature
필드의 값이 46
인 레코드와 일치시키고, 처리된 메시지를 output
로 내보냅니다. 연결 레지스트리의 mySink
연결 주제:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
스트림 프로세서 시작
mongosh
으로 기존 스트림 프로세서를 시작하려면 sp.<streamprocessor>.start()
메서드를 사용합니다. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.
예를 들어 proc01
이라는 스트림 프로세서를 시작하려면 다음 명령을 실행합니다.
sp.proc01.start()
이 메서드는 다음을 반환합니다.
true
스트림 프로세서가 존재하지만 현재 실행 중이 아닌 경우.false
존재하지 않거나 존재하지만 현재 실행 중인 스트림 프로세서를 시작하려고 하는 경우
스트림 프로세서 중지
mongosh
으로 기존 스트림 프로세서를 중지하려면 sp.<streamprocessor>.stop()
메서드를 사용합니다. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다.
예를 들어 proc01
이라는 스트림 프로세서를 중지하려면 다음 명령을 실행합니다.
sp.proc01.stop()
이 메서드는 다음을 반환합니다.
true
스트림 프로세서가 존재하고 현재 실행 중인 경우false
스트림 프로세서가 존재하지 않거나 스트림 프로세서가 현재 실행 중이 아닌 경우.
스트림 프로세서 삭제
mongosh
으로 기존 스트림 프로세서를 삭제하려면 sp.<streamprocessor>.drop()
메서드를 사용합니다. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.
예를 들어 proc01
이라는 스트림 프로세서를 삭제하려면 다음 명령을 실행합니다.
sp.proc01.drop()
이 메서드는 다음을 반환합니다.
true
스트림 프로세서가 존재하는 경우.false
스트림 프로세서가 존재하지 않는 경우.
스트림 프로세서를 삭제하면 Atlas Stream Processing이 이에 대해 프로비저닝한 모든 리소스와 저장된 모든 상태가 함께 폐기됩니다.
사용 가능한 스트림 프로세서 목록
mongosh
을 사용하여 현재 스트림 처리 인스턴스에서 사용 가능한 모든 스트림 프로세서를 나열하려면 sp.listStreamProcessors()
메서드를 사용합니다. 각 스트림 프로세서와 관련된 이름, 시작 시간, 현재 상태 및 파이프라인이 포함된 문서 목록을 반환합니다. 다음과 같은 구문을 가집니다.
sp.listStreamProcessors(<filter>)
<filter>
목록을 필터링할 필드를 지정하는 문서입니다.
예제
다음 예는 필터링되지 않은 요청의 반환 값을 보여줍니다.
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
동일한 Atlas Stream Processing 인스턴스에서 명령을 다시 실행하여 "running"
의 "state"
에 대해 필터링하면 다음 출력이 표시됩니다.
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
스트림 프로세서의 샘플
기존 스트림 프로세서에서 샘플링된 결과의 배열을 mongosh
사용하여 STDOUT
으)로 반환하려면 sp.<streamprocessor>.sample()
메서드를 사용합니다. <streamprocessor>
은(는) 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 예를 들어, 다음 명령은 proc01
이라는 스트림 프로세서에서 샘플링합니다.
sp.proc01.sample()
이 명령은 CTRL-C
을 사용하여 취소할 때까지 또는 반환된 샘플의 누적 크기가 40 MB에 도달할 때까지 계속 실행됩니다. 스트림 프로세서는 샘플의 유효하지 않은 문서를 다음 형식의 _dlqMessage
문서로 보고합니다.
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>' } }
이러한 메시지를 사용하면 데드 레터 큐 컬렉션을 정의하지 않고도 데이터 보안 문제를 진단할 수 있습니다.
스트림 프로세서의 통계 보기
mongosh
을 사용하여 기존 스트림 프로세서의 현재 상태를 요약하는 문서를 반환하려면 sp.<streamprocessor>.stats()
메서드를 사용합니다. streamprocessor
는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 다음과 같은 구문을 가집니다.
sp.<streamprocessor>.stats({options: {<options>}})
여기서 options
은 다음 필드가 있는 선택적 문서입니다.
필드 | 유형 | 설명 |
---|---|---|
scale | integer | 출력의 항목 크기에 사용할 단위입니다. 기본적으로 Atlas Stream Processing은 항목 크기를 바이트 단위로 표시합니다. KB 단위로 표시하려면 1024 의 scale 을 지정합니다. |
verbose | 부울 | 출력 문서의 상세 수준을 지정하는 플래그입니다. true 로 설정하면 출력 문서에는 파이프라인의 각 개별 연산자에 대한 통계를 보고하는 하위 문서가 포함됩니다. 기본값은 false 입니다. |
출력 문서에는 다음과 같은 필드가 있습니다.
필드 | 유형 | 설명 |
---|---|---|
ns | 문자열 | 스트림 프로세서가 정의된 네임스페이스입니다. |
stats | 객체 | 스트림 프로세서의 작동 상태를 설명하는 문서입니다. |
stats.name | 문자열 | 스트림 프로세서의 이름입니다. |
stats.status | 문자열 | 스트림 프로세서의 상태입니다. 이 필드는 다음과 같은 값을 가질 수 있습니다:
|
stats.scaleFactor | integer | 크기 필드가 표시되는 배율입니다. 1 로 설정하면 크기가 바이트 단위로 표시됩니다. 1024 로 설정하면 크기가 킬로바이트 단위로 표시됩니다. |
stats.inputMessageCount | integer | 스트림에 게시된 문서 수입니다. 문서가 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과하면 스트림에 '게시된' 것으로 간주됩니다. |
stats.inputMessageSize | integer | 스트림에 게시된 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과한 후에 스트림에 '게시된' 것으로 간주됩니다. |
stats.outputMessageCount | integer | 스트림에서 처리한 문서 수입니다. 문서가 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다. |
stats.outputMessageSize | integer | 스트림에서 처리한 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다. |
stats.dlqMessageCount | integer | |
stats.dlqMessageSize | integer | |
stats.stateSize | integer | Windows에서 프로세서 상태를 저장하는 데 사용하는 바이트 수입니다. |
stats.watermark | integer | 현재 워터마크의 타임스탬프입니다. |
stats.operatorStats | 배열 | 프로세서 파이프라인의 각 연산자에 대한 통계입니다. Atlas Stream Processing은
또한
|
stats.operatorStats.maxMemoryUsage | integer | 연산자의 최대 메모리 사용량(바이트 또는 킬로바이트)입니다. |
stats.operatorStats.executionTime | integer | 연산자의 총 실행 시간(초)입니다. |
stats.kafkaPartitions | 배열 | Apache Kafka 에 대한 오프셋 정보 브로커의 파티션. 는 kafkaPartitions Apache Kafka 를 사용하는 연결에만 적용됩니다. 소스. |
stats.kafkaPartitions.partition | integer | Apache Kafka 주제 파티션 번호입니다. |
stats.kafkaPartitions.currentOffset | integer | 지정된 파티션에 대해 스트림 프로세서가 있는 오프셋입니다. 이 값은 스트림 프로세서가 처리한 이전 오프셋에 1 을 더한 것과 같습니다. |
stats.kafkaPartitions.checkpointOffset | integer | 스트림 프로세서가 Apache Kafka 에 마지막으로 커밋한 오프셋입니다. 브로커와 지정된 파티션에 대한 체크포인트. 이 오프셋을 통한 모든 메시지는 마지막 체크포인트에 기록됩니다. |
예를 들어 다음은 항목 크기가 KB 단위로 표시되는 inst01
Atlas Stream Processing 인스턴스에서 proc01
스트림 프로세서의 상태를 보여줍니다.
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }