스트림 프로세서 관리
이 페이지의 내용
Atlas Stream Processing 스트림 프로세서는 고유한 이름이 지정된 스트림 집계 파이프라인의 논리를 스트리밍 데이터에 적용합니다. Atlas Stream Processing은 각 스트림 프로세서 정의를 재사용할 수 있도록 영구 스토리지에 저장합니다. 정의가 저장된 스트림 처리 인스턴스에서만 지정된 스트림 프로세서를 사용할 수 있습니다. Atlas Stream Processing은 작업자당 최대 4개 스트림 프로세서를 지원합니다. 이 제한을 초과하는 추가 프로세서에 대해 Atlas Stream Processing은 새 리소스를 할당합니다.
전제 조건
스트림 프로세서를 생성하고 managed 다음이 필요합니다.
mongosh
버전 2.0 이상스트림 프로세서를 생성하고 실행할 수 있는
atlasAdmin
역할이 있는 데이터베이스 사용자Atlas 클러스터
고려 사항
대부분의 스트림 프로세서 명령은 메서드 호출에서 관련 스트림 프로세서의 이름을 지정해야 합니다. 다음 섹션에서 설명하는 구문은 엄밀히 영숫자 이름을 가정합니다. 스트림 프로세서의 이름에 하이픈(-
) 또는 마침표(.
)와 같은 영숫자가 아닌 문자가 포함된 경우, 이름을 대괄호([]
)와 큰따옴표(""
)로 묶어야 합니다. 메서드 호출(예: sp.["special-name-stream"].stats()
.
대화형으로 스트림 프로세서 만들기
sp.process()
메서드를 사용하여 대화형으로 스트림 프로세서를 생성할 수 있습니다. 대화형으로 생성한 스트림 프로세서는 다음과 같은 동작을 보입니다.
출력 및 데드 레터 큐 문서를 shell에 쓰기
생성 즉시 실행 시작
10분 동안 실행하거나 사용자가 중지할 때까지 실행합니다.
중단 후 계속 진행하지 마세요
대화형으로 생성된 스트림 프로세서는 프로토타이핑을 위한 것입니다. 지속적인 스트림 프로세서를 생성하려면 스트림 프로세서 생성을 참조하세요.
sp.process()
의 구문은 다음과 같습니다:
sp.process(<pipeline>)
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 배열 | 필수 사항 | 스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 . |
Atlas Stream Processing 인스턴스에 연결합니다.
Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh
을(를) 사용하여 연결합니다.
예시
다음 명령은 x.059 인증을 사용하여 streamOwner
라는 사용자로 스트림 처리 인스턴스에 연결합니다.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
메시지가 표시되면 사용자 비밀번호를 입력합니다.
파이프라인을 정의합니다.
mongosh
프롬프트에서 적용하려는 집계 단계가 포함된 배열을 pipeline
이라는 변수에 할당합니다.
다음 예제에서는 연결 레지스트리에서 myKafka
연결의 stuff
주제를 $source
로 사용하여 temperature
필드의 값이 46
인 레코드와 일치시키고, 처리된 메시지를 output
로 내보냅니다. 연결 레지스트리의 mySink
연결 주제:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
스트림 프로세서 생성
스트림 프로세서를 생성하려면 다음을 수행합니다.
Atlas Administration API는 스트림 프로세서를 생성하기 위한 엔드포인트를 제공합니다.
mongosh
을 사용하여 새 스트림 프로세서를 만들려면 sp.createStreamProcessor()
메서드를 사용합니다. 다음과 같은 구문을 가집니다.
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 | 스트림 프로세서의 논리적 이름입니다. 이름은 스트림 처리 인스턴스 내에서 고유해야 합니다. 이 이름에는 영숫자만 포함되어야 합니다. |
| 배열 | 필수 사항 | 스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 . |
| 객체 | 옵션 | 스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다. |
| 객체 | 조건부 | Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 |
| 문자열 | 조건부 | 연결 레지스트리에서 연결을 식별하는 사람이 읽을 수 있는 레이블입니다. 이 연결은 Atlas cluster를 참고해야 합니다. 이 필드는 |
| 문자열 | 조건부 |
|
| 문자열 | 조건부 |
|
Atlas Stream Processing 인스턴스에 연결합니다.
Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh
을(를) 사용하여 연결합니다.
예시
다음 명령은 x.059 인증을 사용하여 streamOwner
라는 사용자로 스트림 처리 인스턴스에 연결합니다.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
메시지가 표시되면 사용자 비밀번호를 입력합니다.
파이프라인을 정의합니다.
mongosh
프롬프트에서 적용하려는 집계 단계가 포함된 배열을 pipeline
이라는 변수에 할당합니다.
다음 예제에서는 연결 레지스트리에서 myKafka
연결의 stuff
주제를 $source
로 사용하여 temperature
필드의 값이 46
인 레코드와 일치시키고, 처리된 메시지를 output
로 내보냅니다. 연결 레지스트리의 mySink
연결 주제:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
스트림 프로세서 시작
참고
스트림 프로세서를 시작하려면:
Atlas Administration API는 스트림 프로세서를 시작하기 위한 엔드포인트를 제공합니다.
mongosh
로 기존 스트림 프로세서를 시작하려면 sp.<streamprocessor>.start()
메서드를 사용합니다. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.
예를 들어 proc01
이라는 스트림 프로세서를 시작하려면 다음 명령을 실행합니다.
sp.proc01.start()
이 메서드는 다음을 반환합니다.
true
스트림 프로세서가 존재하고 현재 실행 중이 아닐 때false
존재하지 않거나 존재하지만 현재 실행 중인 스트림 프로세서를 시작하려고 하는 경우
스트림 프로세서 중지
참고
스트림 프로세서를 중지하려면:
Atlas Administration API는 스트림 프로세서를 중지하기 위한 엔드포인트를 제공합니다.
mongosh
로 기존 스트림 프로세서를 중지하려면 sp.<streamprocessor>.stop()
메서드를 사용합니다. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의되고 현재 실행 중인 스트림 프로세서의 이름이어야 합니다.
예를 들어 proc01
이라는 스트림 프로세서를 중지하려면 다음 명령을 실행합니다.
sp.proc01.stop()
이 메서드는 다음을 반환합니다.
true
스트림 프로세서가 존재하고 현재 실행 중인 경우false
스트림 프로세서가 존재하지 않거나 스트림 프로세서가 현재 실행 중이 아닐 때
스트림 프로세서 수정
기존 스트림 프로세서의 다음 요소를 수정할 수 있습니다.
스트림 프로세서를 수정하려면 다음을 수행합니다.
스트림 프로세서에 업데이트 를 적용합니다.
기본값 으로 수정된 프로세서는 마지막 체크포인트 에서 복원 됩니다. 또는 프로세서가 요약 통계만 유지하는 resumeFromCheckpoint=false
를 설정하다 수 있습니다. 창이 열린 상태에서 프로세서를 수정하면 업데이트된 파이프라인 에서 창이 완전히 다시 계산됩니다.
제한 사항
기본값 설정 resumeFromCheckpoint=true
이 활성화되면 다음과 같은 제한 사항 적용.
$source
단계는 수정할 수 없습니다.창 간격은 수정할 수 없습니다.
창 은 제거 할 수 없습니다.
창 내부 파이프라인 에
$group
또는$sort
단계가 있는 경우에만 창 있는 파이프라인 을 수정할 수 있습니다.기존 창 유형은 변경할 수 없습니다. 예를 예시
$tumblingWindow
에서$hoppingWindow
로 또는 그 반대로 변경할 수 없습니다.창이 있는 프로세서는 창을 다시 계산한 결과 일부 데이터를 다시 처리할 수 있습니다.
스트림 프로세서를 수정하려면 다음을 수행합니다.
mongosh
v2.3.4+가 필요합니다.
sp.<streamprocessor>.modify()
명령을 사용하여 기존 스트림 프로세서를 수정합니다. <streamprocessor>
은 현재 스트림 처리 인스턴스 에 대해 정의된 중지된 스트림 프로세서의 이름이어야 합니다.
예를 예시 proc01
이라는 스트림 프로세서를 수정하려면 다음 명령을 실행 합니다.
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
기존 파이프라인에 단계 추가
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
스트림 프로세서의 입력 소스 수정
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
스트림 프로세서에서 데드 레터 대기열 제거
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
창을 사용하여 스트림 프로세서 수정
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
Atlas 관리 API 는 스트림 프로세서를 수정하기 위한 엔드포인트를 제공합니다.
스트림 프로세서 제거
스트림 프로세서를 삭제하려면 다음을 수행합니다.
Atlas Administration API는 스트림 프로세서를 삭제하기 위한 엔드포인트를 제공합니다.
mongosh
로 기존 스트림 프로세서를 삭제하려면 sp.<streamprocessor>.drop()
메서드를 사용합니다. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.
예를 들어 proc01
이라는 스트림 프로세서를 삭제하려면 다음 명령을 실행합니다.
sp.proc01.drop()
이 메서드는 다음을 반환합니다.
true
스트림 프로세서가 존재하는 경우.false
스트림 프로세서가 존재하지 않는 경우.
스트림 프로세서를 삭제하면 Atlas Stream Processing이 이에 대해 프로비저닝한 모든 리소스와 저장된 모든 상태가 함께 폐기됩니다.
사용 가능한 스트림 프로세서 목록
사용 가능한 모든 스트림 프로세서를 나열하려면 다음을 수행합니다.
Atlas Administration API는 사용 가능한 모든 스트림 프로세서를 나열하는 엔드포인트를 제공합니다.
현재 스트림 처리 인스턴스에서 mongosh
로 사용 가능한 모든 스트림 프로세서를 나열하려면 sp.listStreamProcessors()
메서드를 사용하세요. 이는 각 스트림 프로세서와 관련된 이름, 시작 시간, 현재 상태 및 파이프라인이 포함된 문서 목록을 반환합니다. 다음 구문을 포함합니다.
sp.listStreamProcessors(<filter>)
<filter>
목록을 필터링할 필드를 지정하는 문서입니다.
예시
다음 예는 필터링되지 않은 요청의 반환 값을 보여줍니다.
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
동일한 Atlas Stream Processing 인스턴스에서 명령을 다시 실행하여 "running"
의 "state"
에 대해 필터링하면 다음 출력이 표시됩니다.
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
스트림 프로세서의 샘플
기존 스트림 프로세서에서 mongosh
로 샘플링된 결과 배열을 STDOUT
로 반환하려면 sp.<streamprocessor>.sample()
메서드를 사용하세요. <streamprocessor>
는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 예를 들어 다음 명령은 proc01
이라는 스트림 프로세서에서 샘플을 추출합니다.
sp.proc01.sample()
이 명령은 CTRL-C
를 사용하여 취소하거나 반환된 샘플의 크기가 누적 40MB에 도달할 때까지 계속 실행됩니다. 스트림 프로세서는 샘플의 잘못된 문서를 다음 형식의 _dlqMessage
문서로 보고합니다.
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
데드 레터 큐 컬렉션을 정의하지 않고도 이러한 메시지를 사용하여 데이터 위생 문제를 진단할 수 있습니다.
스트림 프로세서의 통계 보기
참고
스트림 프로세서의 통계를 보려면:
Atlas Administration API는 스트림 프로세서의 통계를 볼 수 있는 엔드포인트를 제공합니다.
mongosh
를 사용하여 기존 스트림 프로세서의 현재 상태를 요약하는 문서를 반환하려면 sp.<streamprocessor>.stats()
메서드를 사용하세요. streamprocessor
는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 다음 구문을 포함합니다.
sp.<streamprocessor>.stats({options: {<options>}})
여기서 options
은 다음 필드가 있는 선택적 문서입니다.
필드 | 유형 | 설명 |
---|---|---|
| integer | 출력의 항목 크기에 사용할 단위입니다. 기본적으로 Atlas Stream Processing은 항목 크기를 바이트 단위로 표시합니다. KB 단위로 표시하려면 |
| 부울 | 출력 문서의 상세 수준을 지정하는 플래그입니다. |
출력 문서에는 다음과 같은 필드가 있습니다.
필드 | 유형 | 설명 |
---|---|---|
| 문자열 | 스트림 프로세서가 정의된 네임스페이스입니다. |
| 객체 | 스트림 프로세서의 작동 상태를 설명하는 문서입니다. |
| 문자열 | 스트림 프로세서의 이름입니다. |
| 문자열 | 스트림 프로세서의 상태입니다. 이 필드는 다음과 같은 값을 가질 수 있습니다:
|
| integer | 크기 필드가 표시되는 배율입니다. |
| integer | 스트림에 게시된 문서 수입니다. 문서가 전체 파이프라인을 통과할 때가 아니라 |
| integer | 스트림에 게시된 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과할 때가 아니라 |
| integer | 스트림에서 처리한 문서 수입니다. 문서가 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다. |
| integer | 스트림에서 처리한 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다. |
| integer | |
| integer | |
| integer | |
| token | 가장 최근의 변경 스트림 재개 토큰입니다. 변경 스트림 소스가 있는 스트림 프로세서에만 적용됩니다. |
| integer | Windows에서 프로세서 상태를 저장하는 데 사용하는 바이트 수입니다. |
| integer | 현재 워터마크의 타임스탬프입니다. |
| 배열 | 프로세서 파이프라인의 각 연산자에 대한 통계입니다. Atlas Stream Processing은
|
| integer | 연산자의 최대 메모리 사용량(바이트 또는 킬로바이트)입니다. |
| integer | 연산자의 총 실행 시간(초)입니다. |
| 날짜 | The start time of the minimum open window. This value is optional. |
| 날짜 | The start time of the maximum open window. This value is optional. |
| 배열 | Apache Kafka 브로커 파티션에 대한 오프셋 정보. |
| integer | Apache Kafka 주제 파티션 번호. |
| integer | 지정된 파티션에 대해 스트림 프로세서가 있는 오프셋입니다. 이 값은 스트림 프로세서가 처리한 이전 오프셋에 |
| integer | 스트림 프로세서가 Apache Kafka 에 마지막으로 커밋한 오프셋입니다. 브로커와 지정된 파티션에 대한 체크포인트. 이 오프셋을 통한 모든 메시지는 마지막 체크포인트에 기록됩니다. |
| 부울 | The flag that indicates whether the partition is idle. This value defaults to |
예를 들어 다음은 항목 크기가 KB 단위로 표시되는 inst01
Atlas Stream Processing 인스턴스에서 proc01
스트림 프로세서의 상태를 보여줍니다.
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }