MongoDB Change Streams ์ดํด๋ณด๊ธฐ
์ด ํ์ด์ง์ ๋ด์ฉ
์ด ํํ ๋ฆฌ์ผ์ ๋ฐ๋ผ MongoDB ์ปฌ๋ ์ ์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ฑํ๊ณ ์์ฑํ๋ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ๊ด์ฐฐํ๋ ๋ฐฉ๋ฒ์ ์์๋ณด์ธ์.
Change Streams ์ดํด๋ณด๊ธฐ
ํํ ๋ฆฌ์ผ ์ค์ ์๋ฃ
Kafka Connector ํํ ๋ฆฌ์ผ ์ค์ ์ ๋จ๊ณ๋ฅผ ์๋ฃํ์ฌ Confluent Kafka Connect ๋ฐ MongoDB ํ๊ฒฝ์ ์์ํฉ๋๋ค.
Docker ์ปจํ ์ด๋์ ์ฐ๊ฒฐ
Container shell ํํ ๋ฆฌ์ผ์์ ๋ํํ ์ธ์ 2๊ฐ๋ฅผ Docker ๊ฐ๊ฐ ๋ณ๋์ ์ฐฝ์ ์์ฑํฉ๋๋ค.
ํฐ๋ฏธ๋์์ ๋ํ์ ์ ธ์ ์์ํ๋ ค๋ฉด ๋ค์ ๋ช ๋ น์ ์คํํฉ๋๋ค.
docker exec -it mongo1 /bin/bash
์ด ํํ ๋ฆฌ์ผ ์ ์ฒด์์๋ ์ด ๋ํํ shell ์ ChangeStreamShell1์ด๋ผ๊ณ ํฉ๋๋ค.
๋ํํ ์ ธ์ ์์ํ๋ ค๋ฉด ๋ ๋ฒ์งธ ํฐ๋ฏธ๋์์ ๋ค์ ๋ช ๋ น์ ์คํํฉ๋๋ค.
docker exec -it mongo1 /bin/bash
์ด ํํ ๋ฆฌ์ผ ์ ์ฒด์์๋ ์ด ๋ํํ shell ์ ChangeStreamShell2์ด๋ผ๊ณ ํฉ๋๋ค.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๊ธฐ
ChangeStreamShell1 ์์ PyMongo ๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ฌ๋ Python ์คํฌ๋ฆฝํธ๋ฅผ ๋ง๋ญ๋๋ค.
nano openchangestream.py
๋ค์ ์ฝ๋๋ฅผ ํ์ผ์ ๋ถ์ฌ๋ฃ๊ณ ๋ณ๊ฒฝ ์ฌํญ์ ์ ์ฅํฉ๋๋ค.
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Python ์คํฌ๋ฆฝํธ๋ฅผ ์คํํฉ๋๋ค.
python3 openchangestream.py
์คํฌ๋ฆฝํธ๋ ์ฑ๊ณต์ ์ผ๋ก ์์๋ ํ ๋ค์ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํฉ๋๋ค.
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
๋ณ๊ฒฝ ์ด๋ฒคํธ trigger
ChangeStreamShell2 ์์ MongoDB Shell์ธ mongosh
์(๋ฅผ) ์ฌ์ฉํ์ฌ MongoDB์ ์ฐ๊ฒฐํ๊ณ ๋ค์ ๋ช
๋ น์ ์ฌ์ฉํฉ๋๋ค.
mongosh "mongodb://mongo1"
์ฐ๊ฒฐ์ ์ฑ๊ณตํ๋ฉด ๋ค์๊ณผ ๊ฐ์ MongoDB Shell ํ๋กฌํํธ๊ฐ ํ์๋ฉ๋๋ค.
rs0 [direct: primary] test>
ํ๋กฌํํธ์์ ๋ค์ ๋ช ๋ น์ ์ ๋ ฅํฉ๋๋ค.
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
์์ ๋ช ๋ น์ ์ ๋ ฅํ ํ ChangeStreamShell1 ๋ก ์ ํํ์ฌ ๋ค์๊ณผ ์ ์ฌํ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ํ์ธํฉ๋๋ค.
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
์คํฌ๋ฆฝํธ๋ฅผ ์ค์งํ๋ ค๋ฉด Ctrl+C
์(๋ฅผ) ๋๋ฆ
๋๋ค.
์ด ๋จ๊ณ๊ฐ ๋๋๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ ๋ฅผ ์ฑ๊ณต์ ์ผ๋ก ํธ๋ฆฌ๊ฑฐํ๊ณ ๊ด์ฐฐํ ๊ฒ์ ๋๋ค.
ํํฐ๋ง๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๊ธฐ
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ ์ง๊ณ ํ์ดํ๋ผ์ธ ์ ์ ๋ฌํ์ฌ ํํฐํ๋ค ๋ฅผ ์ ์ฉ ํ ์ ์์ต๋๋ค.
ChangeStreamShell1 ์์ ์ Python ์คํฌ๋ฆฝํธ๋ฅผ ๋ง๋ค์ด PyMongo ๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํ์ฌ ํํฐ๋ง๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ฝ๋๋ค.
nano pipeline.py
๋ค์ ์ฝ๋๋ฅผ ํ์ผ์ ๋ถ์ฌ๋ฃ๊ณ ๋ณ๊ฒฝ ์ฌํญ์ ์ ์ฅํฉ๋๋ค.
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Python ์คํฌ๋ฆฝํธ๋ฅผ ์คํํฉ๋๋ค.
python3 pipeline.py
์คํฌ๋ฆฝํธ๋ ์ฑ๊ณต์ ์ผ๋ก ์์๋ ํ ๋ค์ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํฉ๋๋ค.
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
ํํฐ๋ง๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ๊ด์ฐฐ
mongosh
๋ฅผ ์ฌ์ฉํ์ฌ MongoDB ์ ์ฐ๊ฒฐํด์ผ ํ๋ ChangeStreamShell2 ์ธ์
์ผ๋ก ๋์๊ฐ๋๋ค.
ํ๋กฌํํธ์์ ๋ค์ ๋ช ๋ น์ ์ ๋ ฅํฉ๋๋ค.
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
์คํฌ๋ฆฝํธ ์ถ๋ ฅ์ ํ์๋ ๋๋ก ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ค์ ํ์ดํ๋ผ์ธ๊ณผ ์ผ์นํ๊ธฐ ๋๋ฌธ์ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ์์ฑํฉ๋๋ค.
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
ChangeStreamShell2 ์ ๋ค์ ๋ฌธ์๋ฅผ ์ฝ์ ํ์ฌ ๋ฌธ์๊ฐ ํํฐํ๋ค ์ ์ผ์นํ ๋๋ง ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด ์ด๋ฒคํธ๋ฅผ ์์ฑํ๋์ง ํ์ธํฉ๋๋ค.
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(์ ํ ์ฌํญ) Docker ์ปจํ ์ด๋ ์ค์ง
์ด ํํ ๋ฆฌ์ผ์ ์๋ฃํ ํ Docker ์์ฐ์ ์ค์งํ๊ฑฐ๋ ์ ๊ฑฐํ์ฌ ์ปดํจํฐ์ ๋ฆฌ์์ค๋ฅผ ํ๋ณดํ์ธ์. Docker ์ปจํ ์ด๋์ ์ด๋ฏธ์ง๋ฅผ ๋ชจ๋ ์ ๊ฑฐํ๊ฑฐ๋ ์ปจํ ์ด๋๋ง ์ ๊ฑฐํ๋๋ก ์ ํํ ์ ์์ต๋๋ค. ์ปจํ ์ด๋์ ์ด๋ฏธ์ง๋ฅผ ์ ๊ฑฐํ ๊ฒฝ์ฐ ์ด๋ฅผ ๋ค์ ๋ค์ด๋ก๋ํ์ฌ ํฌ๊ธฐ๊ฐ ์ฝ 2.4GB์ธ MongoDB Kafka Connector ๊ฐ๋ฐ ํ๊ฒฝ์ ๋ค์ ์์ํด์ผ ํฉ๋๋ค. ์ปจํ ์ด๋๋ง ์ ๊ฑฐํ๋ฉด ์ด๋ฏธ์ง๋ฅผ ์ฌ์ฌ์ฉํ ์ ์๊ณ ์ํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์๋ ๋๋ถ๋ถ์ ๋์ฉ๋ ํ์ผ์ ๋ค์ด๋ก๋ํ์ง ์์๋ ๋ฉ๋๋ค.
ํ
๋ ๋ง์ ํํ ๋ฆฌ์ผ
MongoDB Kafka Connector ํํ ๋ฆฌ์ผ์ ๋ ์๋ฃํ๋ ค๋ ๊ฒฝ์ฐ ์ปจํ ์ด๋๋ง ์ ๊ฑฐํ๋ ๊ฒ์ด ์ข์ต๋๋ค. MongoDB Kafka Connector ํํ ๋ฆฌ์ผ์ ๋ ์ด์ ์๋ฃํ์ง ์์ผ๋ ค๋ฉด ์ปจํ ์ด๋ ๋ฐ ์ด๋ฏธ์ง๋ฅผ ์ ๊ฑฐํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
์คํํ ์ ๊ฑฐ ์์ ์ ํด๋นํ๋ ํญ์ ์ ํํฉ๋๋ค.
๋ค์ ์ ธ ๋ช ๋ น์ ์คํํ์ฌ ๊ฐ๋ฐ ํ๊ฒฝ์ ๋ํ Docker ์ปจํ ์ด๋์ ์ด๋ฏธ์ง๋ฅผ ์ ๊ฑฐํฉ๋๋ค.
docker-compose -p mongo-kafka down --rmi all
๋ค์ shell ๋ช ๋ น์ ์คํํ์ฌ Docker ์ปจํ ์ด๋๋ฅผ ์ ๊ฑฐํ๊ณ ๊ฐ๋ฐ ํ๊ฒฝ์ฉ ์ด๋ฏธ์ง๋ ์ ์งํฉ๋๋ค.
docker-compose -p mongo-kafka down
์ปจํ ์ด๋๋ฅผ ๋ค์ ์์ํ๋ ค๋ฉด ํํ ๋ฆฌ์ผ ์ค์ ์์ ์ปจํ ์ด๋๋ฅผ ์์ํ๋ ๋ฐ ํ์ํ ๊ฒ๊ณผ ๋์ผํ ๋จ๊ณ๋ฅผ ๋ฐ๋ฅด์ธ์.
์์ฝ
์ด ํํ ๋ฆฌ์ผ์์๋ MongoDB์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ฑํ๊ณ ์ถ๋ ฅ์ ๊ด์ฐฐํ์ต๋๋ค. MongoDB Kafka ์์ค connector ๋ ์ฌ์ฉ์๊ฐ ๊ตฌ์ฑํ ๋ณ๊ฒฝ ์คํธ๋ฆผ์์ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ์ฝ๊ณ ์ด๋ฅผ Kafka ์ฃผ์ ์ ์๋๋ค.
์์ค ์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ ๋ฐ ์ฃผ์ ๋ฅผ ๊ตฌ์ฑํ๋ Kafka connector ๋ฐฉ๋ฒ์ ์์๋ณด๋ ค๋ฉด MongoDB Kafka ์์ค ์์ํ๊ธฐ connector ํํ ๋ฆฌ์ผ์ ์ฐธ์กฐํ์ธ์.
์์ธํ ์์๋ณด๊ธฐ
์ด ํํ ๋ฆฌ์ผ์์ ์ธ๊ธ๋ ๊ฐ๋ ์ ๋ํด ์์ธํ ์์๋ณด๋ ค๋ฉด ๋ค์ ๋ฆฌ์์ค๋ฅผ ์ฐธ์กฐํ์ธ์.