Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Change Streams 살펴보기

이 페이지의 내용

  • Change Streams 살펴보기
  • 요약
  • 자세히 알아보기

이 튜토리얼을 따라 MongoDB 컬렉션에서 변경 스트림을 생성하고 생성하는 변경 이벤트를 관찰하는 방법을 알아보세요.

1

Kafka Connector 튜토리얼 설정 의 단계를 완료하여 Confluent Kafka Connect 및 MongoDB 환경을 시작합니다.

2

Container shell 튜토리얼에서 대화형 세션 2개를 Docker 각각 별도의 창에 생성합니다.

터미널에서 대화식 셸을 시작하려면 다음 명령을 실행합니다.

docker exec -it mongo1 /bin/bash

이 튜토리얼 전체에서는 이 대화형 shell 을 ChangeStreamShell1이라고 합니다.

대화형 셸을 시작하려면 두 번째 터미널에서 다음 명령을 실행합니다.

docker exec -it mongo1 /bin/bash

이 튜토리얼 전체에서는 이 대화형 shell 을 ChangeStreamShell2이라고 합니다.

3

ChangeStreamShell1 에서 PyMongo 드라이버를 사용하여 변경 스트림을 여는 Python 스크립트를 만듭니다.

nano openchangestream.py

다음 코드를 파일에 붙여넣고 변경 사항을 저장합니다.

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Python 스크립트를 실행합니다.

python3 openchangestream.py

스크립트는 성공적으로 시작된 후 다음 메시지를 출력합니다.

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

ChangeStreamShell2 에서 MongoDB Shell인 mongosh 을(를) 사용하여 MongoDB에 연결하고 다음 명령을 사용합니다.

mongosh "mongodb://mongo1"

연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.

rs0 [direct: primary] test>

프롬프트에서 다음 명령을 입력합니다.

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

앞의 명령을 입력한 후 ChangeStreamShell1 로 전환하여 다음과 유사한 변경 스트림 출력을 확인합니다.

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

스크립트를 중지하려면 Ctrl+C 을(를) 누릅니다.

이 단계가 끝나면 변경 스트림 이벤트 를 성공적으로 트리거하고 관찰한 것입니다.

5

변경 스트림 에 집계 파이프라인 을 전달하여 필터하다 를 적용 할 수 있습니다.

ChangeStreamShell1 에서 새 Python 스크립트를 만들어 PyMongo 드라이버를 사용하여 필터링된 변경 스트림을 엽니다.

nano pipeline.py

다음 코드를 파일에 붙여넣고 변경 사항을 저장합니다.

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Python 스크립트를 실행합니다.

python3 pipeline.py

스크립트는 성공적으로 시작된 후 다음 메시지를 출력합니다.

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

mongosh 를 사용하여 MongoDB 에 연결해야 하는 ChangeStreamShell2 세션으로 돌아갑니다.

프롬프트에서 다음 명령을 입력합니다.

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

스크립트 출력에 표시된 대로 변경 스트림은 다음 파이프라인과 일치하기 때문에 변경 이벤트를 생성합니다.

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

ChangeStreamShell2 에 다음 문서를 삽입하여 문서가 필터하다 와 일치할 때만 변경 스트림 이 이벤트를 생성하는지 확인합니다.

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

이 튜토리얼을 완료한 후 Docker 자산을 중지하거나 제거하여 컴퓨터의 리소스를 확보하세요. Docker 컨테이너와 이미지를 모두 제거하거나 컨테이너만 제거하도록 선택할 수 있습니다. 컨테이너와 이미지를 제거한 경우 이를 다시 다운로드하여 크기가 약 2.4GB인 MongoDB Kafka Connector 개발 환경을 다시 시작해야 합니다. 컨테이너만 제거하면 이미지를 재사용할 수 있고 샘플 데이터 파이프라인에 있는 대부분의 대용량 파일을 다운로드하지 않아도 됩니다.

더 많은 튜토리얼

MongoDB Kafka Connector 튜토리얼을 더 완료하려는 경우 컨테이너만 제거하는 것이 좋습니다. MongoDB Kafka Connector 튜토리얼을 더 이상 완료하지 않으려면 컨테이너 및 이미지를 제거하는 것이 좋습니다.

실행할 제거 작업에 해당하는 탭을 선택합니다.

다음 셸 명령을 실행하여 개발 환경에 대한 Docker 컨테이너와 이미지를 제거합니다.

docker-compose -p mongo-kafka down --rmi all

다음 shell 명령을 실행하여 Docker 컨테이너를 제거하고 개발 환경용 이미지는 유지합니다.

docker-compose -p mongo-kafka down

컨테이너를 다시 시작하려면 튜토리얼 설정에서 컨테이너를 시작하는 데 필요한 것과 동일한 단계를 따르세요.

이 튜토리얼에서는 MongoDB에서 변경 스트림을 생성하고 출력을 관찰했습니다. MongoDB Kafka 소스 connector 는 사용자가 구성한 변경 스트림에서 변경 이벤트를 읽고 이를 Kafka 주제에 씁니다.

소스 에 대한 변경 스트림 및 주제를 구성하는 Kafka connector 방법을 알아보려면 MongoDB Kafka 소스 시작하기 connector 튜토리얼을 참조하세요.

이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

  • Change Streams 및 소스 connector

  • 변경 스트림 출력 수정

  • MongoDB Shell(몽고시)

돌아가기

튜토리얼 설정