문서 메뉴
문서 홈
/
MongoDB 아틀라스
/

Atlas Stream Processing 시작하기

이 페이지의 내용

  • 전제 조건
  • 절차
  • Atlas 에서 프로젝트의 Stream Processing 페이지로 Go 합니다.
  • Atlas Stream Processing 인스턴스를 생성합니다.
  • Atlas Stream Processing 인스턴스 연결 string 을 가져옵니다.
  • 연결 레지스트리에 샘플 스트림 연결을 추가합니다.
  • 연결 레지스트리에 MongoDB Atlas 연결을 추가합니다.
  • 스트리밍 데이터 소스가 메시지를 전송하는지 확인합니다.
  • 영구 스트림 프로세서를 생성합니다.
  • 스트림 프로세서를 시작합니다.
  • 스트림 프로세서의 출력을 확인합니다.
  • 스트림 프로세서를 삭제합니다.
  • 다음 단계

이 튜토리얼에서는 Atlas Stream Processing을 설정하고 첫 번째 스트림 프로세서를 실행하는 단계를 안내합니다.

이 튜토리얼을 완료하려면 다음이 필요합니다.

  • Atlas 프로젝트

  • mongosh 버전 2.0 이상

  • 스트림 처리 인스턴스 및 연결 레지스트리를 관리할 수 있는 Project Owner 또는 Project Stream Processing Owner 역할이 있는 Atlas 사용자

    참고

    Project Owner 역할이 있으면 데이터베이스 배포를 만들고, 프로젝트 액세스 및 프로젝트 설정을 관리하고, IP 액세스 목록 항목을 관리하는 등의 작업을 수행할 수 있습니다.

    Project Stream Processing Owner 역할은 스트림 처리 인스턴스 보기, 생성, 삭제 및 편집, 연결 레지스트리에서 연결 보기, 추가, 수정 및 삭제와 같은 Atlas Stream Processing 작업을 활성화합니다.

    두 역할의 차이점에 대해 자세히 알아보려면 프로젝트 역할 을 참조하세요.

  • 스트림 프로세서를 생성하고 실행할 수 있는 atlasAdmin 역할이 있는 데이터베이스 사용자

  • Atlas 클러스터

1
  1. 아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 프로젝트가 포함된 조직을 선택합니다.

  2. 아직 표시되지 않은 경우 내비게이션 바의 Projects 메뉴에서 프로젝트를 선택합니다.

  3. 사이드바에서 Services 제목 아래의 Stream Processing 을 클릭합니다.

2
  1. 오른쪽 하단 모서리에 있는 Get Started 을 클릭합니다. Atlas는 핵심 Atlas Stream Processing 구성 요소에 대한 간략한 설명을 제공합니다.

  2. Create instance 버튼을 클릭합니다.

  3. Create a stream processing instance 페이지에서 다음과 같이 인스턴스를 구성합니다.

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Create를 클릭합니다.

3
  1. Atlas Stream Processing 인스턴스의 개요 패널을 찾아 Connect 을 클릭합니다.

  2. I have the MongoDB shell installed0}을 선택합니다.

  3. Select your mongo shell version 드롭다운 메뉴에서 mongosh 의 최신 버전을 선택합니다.

  4. Run your connection string in your command line 아래에 제공된 연결 string 을 복사합니다. 이후 단계에서 이 정보가 필요합니다.

  5. Close를 클릭합니다.

4

이 연결은 스트리밍 데이터 소스 역할을 합니다.

  1. Atlas Stream Processing 인스턴스의 창에서 Configure 을(를) 클릭합니다.

  2. Add sample connection. 을(를) 클릭하면 Sample Stream 유형의 sample_stream_solar 연결이 생성됩니다. Connection Registry 탭을 클릭하여 연결이 존재하는지 확인합니다.

5

이 연결은 스트리밍 데이터 싱크 역할을 합니다.

  1. Atlas Stream Processing 인스턴스의 창에서 Configure 을(를) 클릭합니다.

  2. Connection Registry 탭에서 오른쪽 상단의 + Add Connection 을 클릭합니다.

  3. Atlas Database 을(를) 클릭합니다. Connection Name 필드에 mongodb1 를 입력합니다. Atlas Cluster 드롭다운에서 데이터가 저장되지 않은 Atlas 클러스터를 선택합니다.

  4. Add connection를 클릭합니다.

6

이렇게 하려면 최소 스트림 프로세서를 만듭니다.

  1. 원하는 터미널 애플리케이션을 엽니다.

  2. mongosh 을(를) 사용하여 Atlas Stream Processing 인스턴스에 연결합니다.

    이전 단계에서 복사한 mongosh 연결 string 을 터미널에 붙여넣습니다. 여기서 <atlas-stream-processing-url> 는 Atlas Stream Processing 인스턴스의 URL 이고 <username>atlasAdmin 역할을 가진 사용자입니다.

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    메시지가 표시되면 비밀번호를 입력합니다.

  3. 스트림 프로세서를 생성합니다.

    다음 코드를 mongosh 프롬프트에 복사합니다.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar 연결의 데이터가 콘솔에 표시되는지 확인하고 프로세스를 종료합니다.

    sp.process() 으)로 생성한 스트림 프로세서는 종료한 후에는 유지되지 않습니다.

7

다음 코드를 mongosh 프롬프트에 복사합니다.

let s = {
$source: {
connectionName: "sample_stream_solar",
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
}
let u = {
$unwind: {
path: "$obs"
}
}
let g = {
$group: {
_id: "$group_id",
max: {
$max: "$obs.watts"
},
avg: {
$avg: "$obs.watts"
}
}
}
let t = {
$tumblingWindow: {
interval: {
size: NumberInt(1),
unit: "second"
},
pipeline: [u, g]
}
}
let m = {
$merge: {
into: {
connectionName: "mongodb1",
db: "MySolar",
coll: "solar"
}
}
}
sp.createStreamProcessor("avgWatts", [s, t, m])

이렇게 하면 이전에 정의된 쿼리를 적용하고 처리된 데이터를 연결한 클러스터에 있는 MySolar 데이터베이스의 solar 컬렉션에 쓰는 avgWatts 라는 이름의 스트림 프로세서가 생성됩니다. 1 시간 간격으로 관찰된 모든 태양광 패널의 평균 및 피크 와트수를 반환합니다.

Atlas Stream Processing이 미사용 데이터베이스에 쓰는 방법에 대해 자세히 알아보려면 $merge 를 참조하세요.

8

mongosh에서 다음 명령을 실행합니다.

sp.avgWatts.start()
9

프로세서가 활성 상태인지 확인하려면 mongosh에서 다음 명령을 실행합니다.

sp.avgWatts.stats()

이 명령은 avgWatts 스트림 프로세서의 작동 통계를 보고합니다.

스트림 프로세서가 Atlas 클러스터에 데이터를 쓰고 있는지 확인하려면 다음 단계를 따르세요.

  1. Atlas에서 프로젝트의 Clusters 페이지로 이동합니다.

    1. 이미 표시되어 있지 않은 경우 탐색 모음의 Organizations 메뉴에서 원하는 프로젝트가 포함된 조직을 선택합니다.

    2. 이미 표시되어 있지 않은 경우 Projects 탐색 모음의 프로젝트 메뉴에서 원하는 프로젝트를 선택합니다.

    3. Clusters 페이지가 아직 표시되지 않은 경우 사이드바에서 Database를 클릭합니다.

  2. cluster의 Browse Collections 버튼을 클릭합니다.

  3. MySolar 컬렉션을 봅니다.

10

mongosh에서 다음 명령을 실행합니다.

sp.avgWatts.drop()

avgWatts 삭제했는지 확인하려면 사용 가능한 모든 스트림 프로세서를 나열합니다.

sp.listStreamProcessors()

방법 알아보기:

← 개요