MongoDB Change Streams 살펴보기
이 페이지의 내용
이 튜토리얼을 따라 MongoDB 컬렉션에서 변경 스트림을 생성하고 생성하는 변경 이벤트를 관찰하는 방법을 알아보세요.
Change Streams 살펴보기
튜토리얼 설정 완료
Confluent Kafka Connect 및 MongoDB 환경을 시작하려면 Kafka Connector 튜토리얼 설정의 단계를 완료하세요.
Docker 컨테이너에 연결
Container shell 튜토리얼에서 대화형 세션 2개를 Docker 각각 별도의 창에 생성합니다.
터미널에서 대화식 셸을 시작하려면 다음 명령을 실행합니다.
docker exec -it mongo1 /bin/bash
이 튜토리얼 전체에서는 이 대화형 shell 을 ChangeStreamShell1이라고 합니다.
대화형 셸을 시작하려면 두 번째 터미널에서 다음 명령을 실행합니다.
docker exec -it mongo1 /bin/bash
이 튜토리얼 전체에서는 이 대화형 shell 을 ChangeStreamShell2이라고 합니다.
변경 스트림 열기
ChangeStreamShell1 에서 PyMongo 드라이버를 사용하여 변경 스트림을 여는 Python 스크립트를 만듭니다.
nano openchangestream.py
다음 코드를 파일에 붙여넣고 변경 사항을 저장합니다.
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Python 스크립트를 실행합니다.
python3 openchangestream.py
스크립트는 성공적으로 시작된 후 다음 메시지를 출력합니다.
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
변경 이벤트 trigger
ChangeStreamShell2 MongoDB 에서 mongosh
MongoDB shell다음 명령을 사용하여 인 을(를) 사용하여 에 연결합니다.
mongosh "mongodb://mongo1"
연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.
rs0 [direct: primary] test>
프롬프트에서 다음 명령을 입력합니다.
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
앞의 명령을 입력한 후 ChangeStreamShell1 로 전환하여 다음과 유사한 변경 스트림 출력을 확인합니다.
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
스크립트를 중지하려면 Ctrl+C
를 누릅니다.
이 단계가 끝나면 변경 스트림 이벤트 를 성공적으로 트리거하고 관찰한 것입니다.
필터링된 변경 스트림 열기
변경 스트림 에 집계 파이프라인 을 전달하여 필터하다 를 적용 할 수 있습니다.
ChangeStreamShell1 에서 새 Python 스크립트를 만들어 PyMongo 드라이버를 사용하여 필터링된 변경 스트림을 엽니다.
nano pipeline.py
다음 코드를 파일에 붙여넣고 변경 사항을 저장합니다.
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Python 스크립트를 실행합니다.
python3 pipeline.py
스크립트는 성공적으로 시작된 후 다음 메시지를 출력합니다.
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
필터링된 변경 스트림 관찰
mongosh
를 사용하여 MongoDB 에 연결해야 하는 ChangeStreamShell2 세션으로 돌아갑니다.
프롬프트에서 다음 명령을 입력합니다.
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
스크립트 출력에 표시된 대로 변경 스트림은 다음 파이프라인과 일치하기 때문에 변경 이벤트를 생성합니다.
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
ChangeStreamShell2 에 다음 문서를 삽입하여 문서가 필터하다 와 일치할 때만 변경 스트림 이 이벤트를 생성하는지 확인합니다.
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(선택 사항) Docker 컨테이너 중지
이 튜토리얼을 완료한 후 Docker 자산을 중지하거나 제거하여 컴퓨터의 리소스를 확보하세요. Docker 컨테이너와 이미지를 모두 제거하거나 컨테이너만 제거하도록 선택할 수 있습니다. 컨테이너와 이미지를 제거한 경우 이를 다시 다운로드하여 크기가 약 2.4GB인 MongoDB Kafka Connector 개발 환경을 다시 시작해야 합니다. 컨테이너만 제거하면 이미지를 재사용할 수 있고 샘플 데이터 파이프라인에 있는 대부분의 대용량 파일을 다운로드하지 않아도 됩니다.
팁
더 많은 튜토리얼
MongoDB Kafka Connector 튜토리얼을 더 완료하려는 경우 컨테이너만 제거하는 것이 좋습니다. MongoDB Kafka Connector 튜토리얼을 더 이상 완료하지 않으려면 컨테이너 및 이미지를 제거하는 것이 좋습니다.
실행할 제거 작업에 해당하는 탭을 선택합니다.
다음 셸 명령을 실행하여 개발 환경에 대한 Docker 컨테이너와 이미지를 제거합니다.
docker-compose -p mongo-kafka down --rmi all
다음 shell 명령을 실행하여 Docker 컨테이너를 제거하고 개발 환경용 이미지는 유지합니다.
docker-compose -p mongo-kafka down
컨테이너를 다시 시작하려면 튜토리얼 설정에서 컨테이너를 시작하는 데 필요한 것과 동일한 단계를 따르세요.
요약
이 튜토리얼에서는 MongoDB에서 변경 스트림을 생성하고 출력을 관찰했습니다. MongoDB Kafka 소스 connector 는 사용자가 구성한 변경 스트림에서 변경 이벤트를 읽고 이를 Kafka 주제에 씁니다.
소스 에 대한 변경 스트림 및 주제를 구성하는 Kafka connector 방법을 알아보려면 MongoDB Kafka 소스 시작하기 connector 튜토리얼을 참조하세요.
자세히 알아보기
이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.