MongoDB Change Streams 살펴보기
이 페이지의 내용
이 튜토리얼을 따라 MongoDB 컬렉션에서 변경 스트림을 생성하고 생성하는 변경 이벤트를 관찰하는 방법을 알아보세요.
Change Streams 살펴보기
튜토리얼 설정 완료
Kafka Connector 튜토리얼 설정 의 단계를 완료하여 Confluent Kafka Connect 및 MongoDB 환경을 시작합니다.
Docker 컨테이너에 연결
Container shell 튜토리얼에서 대화형 세션 2개를 Docker 각각 별도의 창에 생성합니다.
터미널에서 대화식 셸을 시작하려면 다음 명령을 실행합니다.
docker exec -it mongo1 /bin/bash
이 튜토리얼 전체에서는 이 대화형 shell 을 ChangeStreamShell1이라고 합니다.
대화형 셸을 시작하려면 두 번째 터미널에서 다음 명령을 실행합니다.
docker exec -it mongo1 /bin/bash
이 튜토리얼 전체에서는 이 대화형 shell 을 ChangeStreamShell2이라고 합니다.
변경 스트림 열기
ChangeStreamShell1 에서 PyMongo 드라이버를 사용하여 변경 스트림을 여는 Python 스크립트를 만듭니다.
nano openchangestream.py
다음 코드를 파일에 붙여넣고 변경 사항을 저장합니다.
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Python 스크립트를 실행합니다.
python3 openchangestream.py
스크립트는 성공적으로 시작된 후 다음 메시지를 출력합니다.
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
변경 이벤트 trigger
ChangeStreamShell2 에서 MongoDB Shell인 mongosh
을(를) 사용하여 MongoDB에 연결하고 다음 명령을 사용합니다.
mongosh "mongodb://mongo1"
연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.
rs0 [direct: primary] test>
프롬프트에서 다음 명령을 입력합니다.
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
앞의 명령을 입력한 후 ChangeStreamShell1 로 전환하여 다음과 유사한 변경 스트림 출력을 확인합니다.
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
스크립트를 중지하려면 Ctrl+C
을(를) 누릅니다.
이 단계가 끝나면 변경 스트림 이벤트 를 성공적으로 트리거하고 관찰한 것입니다.
필터링된 변경 스트림 열기
변경 스트림 에 집계 파이프라인 을 전달하여 필터하다 를 적용 할 수 있습니다.
ChangeStreamShell1 에서 새 Python 스크립트를 만들어 PyMongo 드라이버를 사용하여 필터링된 변경 스트림을 엽니다.
nano pipeline.py
다음 코드를 파일에 붙여넣고 변경 사항을 저장합니다.
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Python 스크립트를 실행합니다.
python3 pipeline.py
스크립트는 성공적으로 시작된 후 다음 메시지를 출력합니다.
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
필터링된 변경 스트림 관찰
mongosh
를 사용하여 MongoDB 에 연결해야 하는 ChangeStreamShell2 세션으로 돌아갑니다.
프롬프트에서 다음 명령을 입력합니다.
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
스크립트 출력에 표시된 대로 변경 스트림은 다음 파이프라인과 일치하기 때문에 변경 이벤트를 생성합니다.
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
ChangeStreamShell2 에 다음 문서를 삽입하여 문서가 필터하다 와 일치할 때만 변경 스트림 이 이벤트를 생성하는지 확인합니다.
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(선택 사항) Docker 컨테이너 중지
이 튜토리얼을 완료한 후 Docker 자산을 중지하거나 제거하여 컴퓨터의 리소스를 확보하세요. Docker 컨테이너와 이미지를 모두 제거하거나 컨테이너만 제거하도록 선택할 수 있습니다. 컨테이너와 이미지를 제거한 경우 이를 다시 다운로드하여 크기가 약 2.4GB인 MongoDB Kafka Connector 개발 환경을 다시 시작해야 합니다. 컨테이너만 제거하면 이미지를 재사용할 수 있고 샘플 데이터 파이프라인에 있는 대부분의 대용량 파일을 다운로드하지 않아도 됩니다.
팁
더 많은 튜토리얼
MongoDB Kafka Connector 튜토리얼을 더 완료하려는 경우 컨테이너만 제거하는 것이 좋습니다. MongoDB Kafka Connector 튜토리얼을 더 이상 완료하지 않으려면 컨테이너 및 이미지를 제거하는 것이 좋습니다.
실행할 제거 작업에 해당하는 탭을 선택합니다.
다음 셸 명령을 실행하여 개발 환경에 대한 Docker 컨테이너와 이미지를 제거합니다.
docker-compose -p mongo-kafka down --rmi all
다음 shell 명령을 실행하여 Docker 컨테이너를 제거하고 개발 환경용 이미지는 유지합니다.
docker-compose -p mongo-kafka down
컨테이너를 다시 시작하려면 튜토리얼 설정에서 컨테이너를 시작하는 데 필요한 것과 동일한 단계를 따르세요.
요약
이 튜토리얼에서는 MongoDB에서 변경 스트림을 생성하고 출력을 관찰했습니다. MongoDB Kafka 소스 connector 는 사용자가 구성한 변경 스트림에서 변경 이벤트를 읽고 이를 Kafka 주제에 씁니다.
소스 에 대한 변경 스트림 및 주제를 구성하는 Kafka connector 방법을 알아보려면 MongoDB Kafka 소스 시작하기 connector 튜토리얼을 참조하세요.
자세히 알아보기
이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.