Docs Menu
Docs Home
/
MongoDB Atlas
/

Atlas Stream Processing 시작하기

이 페이지의 내용

  • 전제 조건
  • 절차
  • Atlas 에서 프로젝트의 Stream Processing 페이지로 Go 합니다.
  • Atlas Stream Processing 인스턴스를 생성합니다.
  • Atlas Stream Processing 인스턴스 연결 string 을 가져옵니다.
  • 연결 레지스트리에 MongoDB Atlas 연결을 추가합니다.
  • 스트리밍 데이터 소스가 메시지를 전송하는지 확인합니다.
  • 영구 스트림 프로세서를 생성합니다.
  • 스트림 프로세서를 시작합니다.
  • 스트림 프로세서의 출력을 확인합니다.
  • 스트림 프로세서를 삭제합니다.
  • 다음 단계

이 튜토리얼에서는 Atlas Stream Processing을 설정하고 첫 번째 스트림 프로세서를 실행하는 단계를 안내합니다.

이 튜토리얼을 완료하려면 다음이 필요합니다.

  • Atlas 프로젝트

  • mongosh 버전 2.0 이상

  • 스트림 처리 인스턴스 및 연결 레지스트리를 관리하기 위한 Project Owner 또는 Project Stream Processing Owner 역할이 있는 Atlas 사용자

    참고

    Project Owner 역할이 있으면 데이터베이스 배포를 만들고, 프로젝트 액세스 및 프로젝트 설정을 관리하고, IP 액세스 목록 항목을 관리하는 등의 작업을 수행할 수 있습니다.

    Project Stream Processing Owner 역할은 스트림 처리 인스턴스 보기, 생성, 삭제 및 편집, 연결 레지스트리에 연결 보기, 추가, 수정 및 삭제와 같은 Atlas Stream Processing 작업을 활성화합니다.

    두 역할 간의 차이에 대한 자세한 내용은 프로젝트 역할을 참조하세요.

  • 스트림 프로세서를 생성하고 실행할 수 있는 atlasAdmin 역할이 있는 데이터베이스 사용자

  • Atlas 클러스터

1
  1. 아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 프로젝트가 포함된 조직을 선택합니다.

  2. 아직 표시되지 않은 경우 내비게이션 바의 Projects 메뉴에서 프로젝트를 선택합니다.

  3. 사이드바에서 Services 제목 아래의 Stream Processing를 클릭합니다.

    스트림 처리 페이지가 표시됩니다.

2
  1. 오른쪽 하단 모서리에 있는 Get Started 을 클릭합니다. Atlas는 핵심 Atlas Stream Processing 구성 요소에 대한 간략한 설명을 제공합니다.

  2. Create instance 버튼을 클릭합니다.

  3. Create a stream processing instance 페이지에서 다음과 같이 인스턴스를 구성합니다.

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Create를 클릭합니다.

3
  1. Atlas Stream Processing 인스턴스의 개요 패널을 찾아 Connect 을 클릭합니다.

  2. I have the MongoDB shell installed0}을 선택합니다.

  3. Select your mongo shell version 드롭다운 메뉴에서 mongosh 의 최신 버전을 선택합니다.

  4. Run your connection string in your command line 아래에 제공된 연결 string 을 복사합니다. 이후 단계에서 이 정보가 필요합니다.

  5. Close를 클릭합니다.

4

이 연결은 스트리밍 데이터 싱크 역할을 합니다.

  1. Atlas Stream Processing 인스턴스의 창에서 Configure 을(를) 클릭합니다.

  2. Connection Registry 탭에서 오른쪽 상단의 + Add Connection 을 클릭합니다.

  3. Atlas Database 을(를) 클릭합니다. Connection Name 필드에 mongodb1 를 입력합니다. Atlas Cluster 드롭다운에서 데이터가 저장되지 않은 Atlas 클러스터를 선택합니다.

  4. Add connection를 클릭합니다.

5

스트림 처리 인스턴스는 sample_stream_solar라는 샘플 데이터 소스에 대한 연결로 미리 구성되어 제공됩니다. 이 소스는 다양한 태양광 발전 장치에서 보고서 스트림을 생성합니다. 각 보고서는 특정 시점에서 단일 태양광 장치의 관찰된 전력량과 온도, 해당 장치의 최대 전력량을 설명합니다.

다음 문서는 대표적인 예입니다.

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
},
_ts: ISODate('2024-08-12T21:41:01.788Z'),
_stream_meta: {
source: {
type: 'generated'
}
}
}

이 소스가 메시지를 내보내는지 확인하려면 스트림 프로세서를 대화식으로 생성합니다.

  1. 원하는 터미널 애플리케이션을 엽니다.

  2. mongosh 을(를) 사용하여 Atlas Stream Processing 인스턴스에 연결합니다.

    이전 단계에서 복사한 mongosh 연결 string 을 터미널에 붙여넣습니다. 여기서 <atlas-stream-processing-url> 는 Atlas Stream Processing 인스턴스의 URL 이고 <username>atlasAdmin 역할을 가진 사용자입니다.

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    메시지가 표시되면 비밀번호를 입력합니다.

  3. 스트림 프로세서를 생성합니다.

    다음 코드를 mongosh 프롬프트에 복사합니다.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar 연결의 데이터가 콘솔에 표시되는지 확인하고 프로세스를 종료합니다.

    sp.process() 으)로 생성한 스트림 프로세서는 종료한 후에는 유지되지 않습니다.

6

집계 파이프라인을 사용하면 수집되는 각 문서를 변환할 수 있습니다. 다음 집계 파이프라인은 1초 간격으로 각 태양광 장치의 최대 온도와 평균, 중앙값, 최대 및 최소 전력량을 도출합니다.

  1. $source 단계를 구성합니다.

    다음 $source 단계는 sample_stream_solar 소스에서 데이터를 인제스트합니다.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  2. $group 단계를 구성합니다.

    다음 $group 단계에서는 들어오는 모든 데이터를 group_id 에 따라 구성하고 각 group_id에 대한 모든 문서의 obs.tempobs.watts 필드 값을 누적한 다음, 원하는 데이터를 파생시킵니다.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $avg: "$obs.temp"
    },
    avg_watts: {
    $min: "$obs.watts"
    },
    median_watts: {
    $min: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  3. $tumblingWindow 단계를 구성합니다.

    스트리밍 데이터에 $group과 같은 누적을 수행하기 위해 Atlas Stream Processing은 을 사용하여 데이터 세트를 제한합니다. 다음 $tumblingWindow 단계에서는 스트림을 연속된 10초 간격으로 분리합니다.

    이는 예를 들어 $group 단계가 median_watts에 대한 값을 계산할 때 이전 10초 동안 수집된 주어진 group_id의 모든 문서에 대해 obs.watts 값을 고려한다는 것을 의미합니다.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  4. $merge 단계를 구성합니다.

    $merge 처리된 스트리밍 데이터를 Atlas 데이터베이스에 기록할 수 있습니다.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  5. 스트림 프로세서를 생성합니다.

    새 스트림 프로세서에 이름을 지정하고, 각 단계를 순서대로 나열하여 집계 파이프라인을 선언합니다. $group 단계는 $tumblingWindow의 중첩된 파이프라인에 속하며 프로세서 파이프라인 정의에 포함해서는 안 됩니다.

    sp.createStreamProcessor("solarDemo", [s, t, m])

이는 solarDemo라는 스트림 프로세서를 생성하여 이전에 정의한 쿼리를 적용하고 처리된 데이터를 연결된 클러스터의 solarDb 데이터베이스의 solarColl 컬렉션에 기록합니다. 이 프로세서는 태양광 장치로부터의 관측 데이터를 10초 간격으로 나누어 도출한 다양한 측정값을 반환합니다.

Atlas Stream Processing이 미사용 데이터베이스에 쓰는 방법을 자세히 알아보려면 $merge를 참조하세요.

7

mongosh에서 다음 명령을 실행합니다.

sp.solarDemo.start()
8

프로세서가 활성 상태인지 확인하려면 mongosh에서 다음 명령을 실행합니다.

sp.solarDemo.stats()

이 명령은 solarDemo 스트림 프로세서의 작동 통계를 보고합니다.

스트림 프로세서가 Atlas 클러스터에 데이터를 쓰고 있는지 확인하려면 다음을 수행하세요.

  1. Atlas에서 프로젝트의 Clusters 페이지로 이동합니다.

    1. 아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 원하는 프로젝트가 포함된 조직을 선택합니다.

    2. 아직 표시되지 않은 경우 탐색 표시줄의 Projects 메뉴에서 원하는 프로젝트를 선택합니다.

    3. 아직 표시되지 않은 경우 사이드바에서 Clusters를 클릭합니다.

      Clusters(클러스터) 페이지가 표시됩니다.

  2. cluster의 Browse Collections 버튼을 클릭합니다.

    데이터 탐색기 가 표시됩니다.

  3. MySolar 컬렉션을 확인하세요.

또는 mongosh를 사용하여 처리된 문서의 샘플을 터미널에 표시할 수 있습니다.

sp.solarDemo.sample()
{
_id: 10,
max_watts: 136,
min_watts: 130,
avg_watts: 133,
median_watts: 130,
max_temp: 7,
_stream_meta: {
source: {
type: 'generated'
},
window: {
start: ISODate('2024-08-12T22:49:05.000Z'),
end: ISODate('2024-08-12T22:49:10.000Z')
}
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

9

mongosh에서 다음 명령을 실행합니다.

sp.solarDemo.drop()

avgWatts 삭제했는지 확인하려면 사용 가능한 모든 스트림 프로세서를 나열합니다.

sp.listStreamProcessors()

방법 알아보기:

돌아가기

개요