Docs Menu
Docs Home
/
MongoDB Atlas
/

스트림 프로세서 관리

이 페이지의 내용

  • 전제 조건
  • 고려 사항
  • 대화형으로 스트림 프로세서 만들기
  • Atlas Stream Processing 인스턴스에 연결합니다.
  • 파이프라인을 정의합니다.
  • 스트림 프로세서를 생성합니다.
  • 스트림 프로세서 생성
  • 스트림 프로세서 시작
  • 스트림 프로세서 중지
  • 스트림 프로세서 제거
  • 사용 가능한 스트림 프로세서 목록
  • 스트림 프로세서의 샘플
  • 스트림 프로세서의 통계 보기

Atlas Stream Processing 스트림 프로세서는 고유한 이름이 지정된 스트림 집계 파이프라인의 논리를 스트리밍 데이터에 적용합니다. Atlas Stream Processing은 각 스트림 프로세서 정의를 재사용할 수 있도록 영구 스토리지에 저장합니다. 정의가 저장된 스트림 처리 인스턴스에서만 지정된 스트림 프로세서를 사용할 수 있습니다. Atlas Stream Processing은 작업자당 최대 4개 스트림 프로세서를 지원합니다. 이 제한을 초과하는 추가 프로세서에 대해 Atlas Stream Processing은 새 리소스를 할당합니다.

스트림 프로세서를 생성하고 managed 다음이 필요합니다.

대부분의 스트림 프로세서 명령은 메서드 호출에서 관련 스트림 프로세서의 이름을 지정해야 합니다. 다음 섹션에서 설명하는 구문은 엄밀히 영숫자 이름을 가정합니다. 스트림 프로세서의 이름에 하이픈(-) 또는 마침표(.)와 같은 영숫자가 아닌 문자가 포함된 경우, 이름을 대괄호([])와 큰따옴표("")로 묶어야 합니다. 메서드 호출(예: sp.["special-name-stream"].stats().

sp.process() 메서드를 사용하여 대화형으로 스트림 프로세서를 생성할 수 있습니다. 대화형으로 생성한 스트림 프로세서는 다음과 같은 동작을 보입니다.

  • 출력 및 데드 레터 큐 문서를 shell에 쓰기

  • 생성 즉시 실행 시작

  • 10분 동안 실행하거나 사용자가 중지할 때까지 실행합니다.

  • 중단 후 계속 진행하지 마세요

대화형으로 생성된 스트림 프로세서는 프로토타이핑을 위한 것입니다. 지속적인 스트림 프로세서를 생성하려면 스트림 프로세서 생성을 참조하세요.

sp.process() 의 구문은 다음과 같습니다:

sp.process(<pipeline>)
필드
유형
필요성
설명
pipeline
배열
필수 사항
스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 .
1

Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

예시

다음 명령어는 SCRAM-SHA-256 인증을 사용하여 streamOwner 이)라는 사용자로 Atlas Stream Processing 인스턴스에 연결합니다.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

메시지가 표시되면 사용자 비밀번호를 입력합니다.

2

mongosh 프롬프트에서 적용하려는 집계 단계가 포함된 배열을 pipeline이라는 변수에 할당합니다.

다음 예제에서는 연결 레지스트리에서 myKafka 연결의 stuff 주제를 $source 로 사용하여 temperature 필드의 값이 46 인 레코드와 일치시키고, 처리된 메시지를 output 로 내보냅니다. 연결 레지스트리의 mySink 연결 주제:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

다음 명령은 pipeline 에 정의된 로직을 적용하는 스트림 프로세서를 만듭니다.

sp.process(pipeline)

스트림 프로세서를 만들려면:

Atlas Administration API는 스트림 프로세서를 생성하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 생성

mongosh 을 사용하여 새 스트림 프로세서를 만들려면 sp.createStreamProcessor() 메서드를 사용합니다. 다음과 같은 구문을 가집니다.

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
유형
필요성
설명
name
문자열
필수 사항
스트림 프로세서의 논리적 이름입니다. 이름은 스트림 처리 인스턴스 내에서 고유해야 합니다. 이 이름에는 영숫자만 포함되어야 합니다.
pipeline
배열
필수 사항
스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 .
options
객체
옵션
스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다.
options.dlq
객체
조건부
Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 options 필드를 정의하는 경우 필요합니다.
options.dlq.connectionName
문자열
조건부
연결 레지스트리에서 연결을 식별하는 사람이 읽을 수 있는 레이블입니다. 이 연결은 Atlas cluster를 참고해야 합니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
options.dlq.db
문자열
조건부
options.dlq.connectionName에 지정된 cluster의 Atlas 데이터베이스 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
options.dlq.coll
문자열
조건부
options.dlq.db 에 지정된 데이터베이스의 collection 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.
1

Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

예시

다음 명령어는 SCRAM-SHA-256 인증을 사용하여 streamOwner 이)라는 사용자로 Atlas Stream Processing 인스턴스에 연결합니다.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

메시지가 표시되면 사용자 비밀번호를 입력합니다.

2

mongosh 프롬프트에서 적용하려는 집계 단계가 포함된 배열을 pipeline이라는 변수에 할당합니다.

다음 예제에서는 연결 레지스트리에서 myKafka 연결의 stuff 주제를 $source 로 사용하여 temperature 필드의 값이 46 인 레코드와 일치시키고, 처리된 메시지를 output 로 내보냅니다. 연결 레지스트리의 mySink 연결 주제:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh 프롬프트에서 DLQ의 다음 속성이 포함된 객체를 할당합니다.

  • connectionName

  • 데이터베이스 이름

  • 컬렉션 이름

다음 예제에서는 metadata.dlq 데이터베이스 collection에서 cluster01 연결을 통한 DLQ를 정의합니다.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

다음 명령어는 pipeline 에 정의된 로직을 적용하는 proc01 라는 이름의 스트림 프로세서를 만듭니다. 처리 중 오류가 발생한 문서는 deadLetter 에 정의된 DLQ에 기록됩니다.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

참고

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

스트림 프로세서를 시작하려면:

Atlas 관리 API는 스트림 프로세서를 시작하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 시작

mongosh로 기존 스트림 프로세서를 시작하려면 sp.<streamprocessor>.start() 메서드를 사용합니다. <streamprocessor>는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01 이라는 스트림 프로세서를 시작하려면 다음 명령을 실행합니다.

sp.proc01.start()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하고 현재 실행 중이 아닐 때

  • false 존재하지 않거나 존재하지만 현재 실행 중인 스트림 프로세서를 시작하려고 하는 경우

참고

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

스트림 프로세서를 중지하려면:

Atlas Administration API는 스트림 프로세서를 중지하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 중지

mongosh로 기존 스트림 프로세서를 중지하려면 sp.<streamprocessor>.stop() 메서드를 사용합니다. <streamprocessor>는 현재 스트림 처리 인스턴스에 대해 정의되고 현재 실행 중인 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01 이라는 스트림 프로세서를 중지하려면 다음 명령을 실행합니다.

sp.proc01.stop()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하고 현재 실행 중인 경우

  • false 스트림 프로세서가 존재하지 않거나 스트림 프로세서가 현재 실행 중이 아닐 때

스트림 프로세서를 삭제하려면:

Atlas 관리 API는 스트림 프로세서를 삭제하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 삭제

mongosh로 기존 스트림 프로세서를 삭제하려면 sp.<streamprocessor>.drop() 메서드를 사용합니다. <streamprocessor>는 현재 스트림 처리 인스턴스에 대해 정의된 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01 이라는 스트림 프로세서를 삭제하려면 다음 명령을 실행합니다.

sp.proc01.drop()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하는 경우.

  • false 스트림 프로세서가 존재하지 않는 경우.

스트림 프로세서를 삭제하면 Atlas Stream Processing이 이에 대해 프로비저닝한 모든 리소스와 저장된 모든 상태가 함께 폐기됩니다.

사용 가능한 모든 스트림 프로세서를 나열하려면 다음을 수행합니다.

Atlas Administration API는 사용 가능한 모든 스트림 프로세서를 나열하는 엔드포인트를 제공합니다.

단일 스트림 프로세서 삭제

현재 스트림 처리 인스턴스에서 mongosh로 사용 가능한 모든 스트림 프로세서를 나열하려면 sp.listStreamProcessors() 메서드를 사용하세요. 이는 각 스트림 프로세서와 관련된 이름, 시작 시간, 현재 상태 및 파이프라인이 포함된 문서 목록을 반환합니다. 다음 구문을 포함합니다.

sp.listStreamProcessors(<filter>)

<filter> 목록을 필터링할 필드를 지정하는 문서입니다.

예시

다음 예는 필터링되지 않은 요청의 반환 값을 보여줍니다.

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

동일한 Atlas Stream Processing 인스턴스에서 명령을 다시 실행하여 "running""state" 에 대해 필터링하면 다음 출력이 표시됩니다.

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

기존 스트림 프로세서에서 mongosh로 샘플링된 결과 배열을 STDOUT로 반환하려면 sp.<streamprocessor>.sample() 메서드를 사용하세요. <streamprocessor>는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 예를 들어 다음 명령은 proc01이라는 스트림 프로세서에서 샘플을 추출합니다.

sp.proc01.sample()

이 명령은 CTRL-C를 사용하여 취소하거나 반환된 샘플의 크기가 누적 40MB에 도달할 때까지 계속 실행됩니다. 스트림 프로세서는 샘플의 잘못된 문서를 다음 형식의 _dlqMessage 문서로 보고합니다.

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

데드 레터 큐 컬렉션을 정의하지 않고도 이러한 메시지를 사용하여 데이터 위생 문제를 진단할 수 있습니다.

참고

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

스트림 프로세서의 통계를 보려면:

Atlas Administration API는 스트림 프로세서의 통계를 볼 수 있는 엔드포인트를 제공합니다.

단일 스트림 프로세서 가져오기

mongosh를 사용하여 기존 스트림 프로세서의 현재 상태를 요약하는 문서를 반환하려면 sp.<streamprocessor>.stats() 메서드를 사용하세요. streamprocessor는 현재 스트림 처리 인스턴스에 대해 정의된 현재 실행 중인 스트림 프로세서의 이름이어야 합니다. 다음 구문을 포함합니다.

sp.<streamprocessor>.stats({options: {<options>}})

여기서 options 은 다음 필드가 있는 선택적 문서입니다.

필드
유형
설명
scale
integer
출력의 항목 크기에 사용할 단위입니다. 기본적으로 Atlas Stream Processing은 항목 크기를 바이트 단위로 표시합니다. KB 단위로 표시하려면 1024scale 을 지정합니다.
verbose
부울
출력 문서의 상세 수준을 지정하는 플래그입니다. true 로 설정하면 출력 문서에는 파이프라인의 각 개별 연산자에 대한 통계를 보고하는 하위 문서가 포함됩니다. 기본값은 false 입니다.

출력 문서에는 다음과 같은 필드가 있습니다.

필드
유형
설명
ns
문자열
스트림 프로세서가 정의된 네임스페이스입니다.
stats
객체
스트림 프로세서의 작동 상태를 설명하는 문서입니다.
stats.name
문자열
스트림 프로세서의 이름입니다.
stats.status
문자열

스트림 프로세서의 상태입니다. 이 필드는 다음과 같은 값을 가질 수 있습니다:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
integer
크기 필드가 표시되는 배율입니다. 1 로 설정하면 크기가 바이트 단위로 표시됩니다. 1024 로 설정하면 크기가 킬로바이트 단위로 표시됩니다.
stats.inputMessageCount
integer
스트림에 게시된 문서 수입니다. 문서가 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과하면 스트림에 '게시된' 것으로 간주됩니다.
stats.inputMessageSize
integer
스트림에 게시된 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과한 후에 스트림에 '게시된' 것으로 간주됩니다.
stats.outputMessageCount
integer
스트림에서 처리한 문서 수입니다. 문서가 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다.
stats.outputMessageSize
integer
스트림에서 처리한 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다.
stats.dlqMessageCount
integer
stats.dlqMessageSize
integer
stats.changeStreamTimeDifferenceSecs
integer
가장 최근의 변경 스트림 재개 토큰이 나타내는 이벤트 시간과 oplog의 최신 이벤트 간의 차이(초)입니다.
stats.changeStreamState
token
가장 최근의 변경 스트림 재개 토큰입니다. 변경 스트림 소스가 있는 스트림 프로세서에만 적용됩니다.
stats.stateSize
integer
Windows에서 프로세서 상태를 저장하는 데 사용하는 바이트 수입니다.
stats.watermark
integer
현재 워터마크의 타임스탬프입니다.
stats.operatorStats
배열

프로세서 파이프라인의 각 연산자에 대한 통계입니다. Atlas Stream Processing은 verbose 옵션을 전달한 경우에만 이 필드를 반환합니다.

stats.operatorStats 다양한 핵심 stats 필드의 연산자별 버전을 제공합니다.

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

또한 stats.operatorStats 에는 다음과 같은 고유 필드가 포함되어 있습니다.

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
integer
연산자의 최대 메모리 사용량(바이트 또는 킬로바이트)입니다.
stats.operatorStats.executionTime
integer
연산자의 총 실행 시간(초)입니다.
stats.kafkaPartitions
배열
Apache Kafka 브로커 파티션에 대한 오프셋 정보. kafkaPartitionsApache Kafka 소스를 사용하는 연결에만 적용됩니다.
stats.kafkaPartitions.partition
integer
Apache Kafka 주제 파티션 번호.
stats.kafkaPartitions.currentOffset
integer
지정된 파티션에 대해 스트림 프로세서가 있는 오프셋입니다. 이 값은 스트림 프로세서가 처리한 이전 오프셋에 1 을 더한 것과 같습니다.
stats.kafkaPartitions.checkpointOffset
integer
스트림 프로세서가 Apache Kafka 에 마지막으로 커밋한 오프셋입니다. 브로커와 지정된 파티션에 대한 체크포인트. 이 오프셋을 통한 모든 메시지는 마지막 체크포인트에 기록됩니다.

예를 들어 다음은 항목 크기가 KB 단위로 표시되는 inst01 Atlas Stream Processing 인스턴스에서 proc01 스트림 프로세서의 상태를 보여줍니다.

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

돌아가기

연결 관리