Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Change Streams ์‚ดํŽด๋ณด๊ธฐ

์ด ํŽ˜์ด์ง€์˜ ๋‚ด์šฉ

  • Change Streams ์‚ดํŽด๋ณด๊ธฐ
  • ์š”์•ฝ
  • ์ž์„ธํžˆ ์•Œ์•„๋ณด๊ธฐ

์ด ํŠœํ† ๋ฆฌ์–ผ์„ ๋”ฐ๋ผ MongoDB ์ปฌ๋ ‰์…˜์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•˜๊ณ  ์ƒ์„ฑํ•˜๋Š” ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ๊ด€์ฐฐํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด์„ธ์š”.

1

Kafka Connector ํŠœํ† ๋ฆฌ์–ผ ์„ค์ • ์˜ ๋‹จ๊ณ„๋ฅผ ์™„๋ฃŒํ•˜์—ฌ Confluent Kafka Connect ๋ฐ MongoDB ํ™˜๊ฒฝ์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

2

Container shell ํŠœํ† ๋ฆฌ์–ผ์—์„œ ๋Œ€ํ™”ํ˜• ์„ธ์…˜ 2๊ฐœ๋ฅผ Docker ๊ฐ๊ฐ ๋ณ„๋„์˜ ์ฐฝ์— ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

ํ„ฐ๋ฏธ๋„์—์„œ ๋Œ€ํ™”์‹ ์…ธ์„ ์‹œ์ž‘ํ•˜๋ ค๋ฉด ๋‹ค์Œ ๋ช…๋ น์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

docker exec -it mongo1 /bin/bash

์ด ํŠœํ† ๋ฆฌ์–ผ ์ „์ฒด์—์„œ๋Š” ์ด ๋Œ€ํ™”ํ˜• shell ์„ ChangeStreamShell1์ด๋ผ๊ณ  ํ•ฉ๋‹ˆ๋‹ค.

๋Œ€ํ™”ํ˜• ์…ธ์„ ์‹œ์ž‘ํ•˜๋ ค๋ฉด ๋‘ ๋ฒˆ์งธ ํ„ฐ๋ฏธ๋„์—์„œ ๋‹ค์Œ ๋ช…๋ น์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

docker exec -it mongo1 /bin/bash

์ด ํŠœํ† ๋ฆฌ์–ผ ์ „์ฒด์—์„œ๋Š” ์ด ๋Œ€ํ™”ํ˜• shell ์„ ChangeStreamShell2์ด๋ผ๊ณ  ํ•ฉ๋‹ˆ๋‹ค.

3

ChangeStreamShell1 ์—์„œ PyMongo ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ฌ๋Š” Python ์Šคํฌ๋ฆฝํŠธ๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

nano openchangestream.py

๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ํŒŒ์ผ์— ๋ถ™์—ฌ๋„ฃ๊ณ  ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Python ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

python3 openchangestream.py

์Šคํฌ๋ฆฝํŠธ๋Š” ์„ฑ๊ณต์ ์œผ๋กœ ์‹œ์ž‘๋œ ํ›„ ๋‹ค์Œ ๋ฉ”์‹œ์ง€๋ฅผ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

ChangeStreamShell2 ์—์„œ MongoDB Shell์ธ mongosh ์„(๋ฅผ) ์‚ฌ์šฉํ•˜์—ฌ MongoDB์— ์—ฐ๊ฒฐํ•˜๊ณ  ๋‹ค์Œ ๋ช…๋ น์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

mongosh "mongodb://mongo1"

์—ฐ๊ฒฐ์— ์„ฑ๊ณตํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ MongoDB Shell ํ”„๋กฌํ”„ํŠธ๊ฐ€ ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.

rs0 [direct: primary] test>

ํ”„๋กฌํ”„ํŠธ์—์„œ ๋‹ค์Œ ๋ช…๋ น์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค.

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

์•ž์˜ ๋ช…๋ น์„ ์ž…๋ ฅํ•œ ํ›„ ChangeStreamShell1 ๋กœ ์ „ํ™˜ํ•˜์—ฌ ๋‹ค์Œ๊ณผ ์œ ์‚ฌํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

์Šคํฌ๋ฆฝํŠธ๋ฅผ ์ค‘์ง€ํ•˜๋ ค๋ฉด Ctrl+C ์„(๋ฅผ) ๋ˆ„๋ฆ…๋‹ˆ๋‹ค.

์ด ๋‹จ๊ณ„๊ฐ€ ๋๋‚˜๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ ๋ฅผ ์„ฑ๊ณต์ ์œผ๋กœ ํŠธ๋ฆฌ๊ฑฐํ•˜๊ณ  ๊ด€์ฐฐํ•œ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

5

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์— ์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ ์„ ์ „๋‹ฌํ•˜์—ฌ ํ•„ํ„ฐํ•˜๋‹ค ๋ฅผ ์ ์šฉ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ChangeStreamShell1 ์—์„œ ์ƒˆ Python ์Šคํฌ๋ฆฝํŠธ๋ฅผ ๋งŒ๋“ค์–ด PyMongo ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ•„ํ„ฐ๋ง๋œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ฝ๋‹ˆ๋‹ค.

nano pipeline.py

๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ํŒŒ์ผ์— ๋ถ™์—ฌ๋„ฃ๊ณ  ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Python ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

python3 pipeline.py

์Šคํฌ๋ฆฝํŠธ๋Š” ์„ฑ๊ณต์ ์œผ๋กœ ์‹œ์ž‘๋œ ํ›„ ๋‹ค์Œ ๋ฉ”์‹œ์ง€๋ฅผ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

mongosh ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ MongoDB ์— ์—ฐ๊ฒฐํ•ด์•ผ ํ•˜๋Š” ChangeStreamShell2 ์„ธ์…˜์œผ๋กœ ๋Œ์•„๊ฐ‘๋‹ˆ๋‹ค.

ํ”„๋กฌํ”„ํŠธ์—์„œ ๋‹ค์Œ ๋ช…๋ น์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค.

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

์Šคํฌ๋ฆฝํŠธ ์ถœ๋ ฅ์— ํ‘œ์‹œ๋œ ๋Œ€๋กœ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ๊ณผ ์ผ์น˜ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

ChangeStreamShell2 ์— ๋‹ค์Œ ๋ฌธ์„œ๋ฅผ ์‚ฝ์ž…ํ•˜์—ฌ ๋ฌธ์„œ๊ฐ€ ํ•„ํ„ฐํ•˜๋‹ค ์™€ ์ผ์น˜ํ•  ๋•Œ๋งŒ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด ์ด๋ฒคํŠธ๋ฅผ ์ƒ์„ฑํ•˜๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

์ด ํŠœํ† ๋ฆฌ์–ผ์„ ์™„๋ฃŒํ•œ ํ›„ Docker ์ž์‚ฐ์„ ์ค‘์ง€ํ•˜๊ฑฐ๋‚˜ ์ œ๊ฑฐํ•˜์—ฌ ์ปดํ“จํ„ฐ์˜ ๋ฆฌ์†Œ์Šค๋ฅผ ํ™•๋ณดํ•˜์„ธ์š”. Docker ์ปจํ…Œ์ด๋„ˆ์™€ ์ด๋ฏธ์ง€๋ฅผ ๋ชจ๋‘ ์ œ๊ฑฐํ•˜๊ฑฐ๋‚˜ ์ปจํ…Œ์ด๋„ˆ๋งŒ ์ œ๊ฑฐํ•˜๋„๋ก ์„ ํƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ปจํ…Œ์ด๋„ˆ์™€ ์ด๋ฏธ์ง€๋ฅผ ์ œ๊ฑฐํ•œ ๊ฒฝ์šฐ ์ด๋ฅผ ๋‹ค์‹œ ๋‹ค์šด๋กœ๋“œํ•˜์—ฌ ํฌ๊ธฐ๊ฐ€ ์•ฝ 2.4GB์ธ MongoDB Kafka Connector ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ปจํ…Œ์ด๋„ˆ๋งŒ ์ œ๊ฑฐํ•˜๋ฉด ์ด๋ฏธ์ง€๋ฅผ ์žฌ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ณ  ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์— ์žˆ๋Š” ๋Œ€๋ถ€๋ถ„์˜ ๋Œ€์šฉ๋Ÿ‰ ํŒŒ์ผ์„ ๋‹ค์šด๋กœ๋“œํ•˜์ง€ ์•Š์•„๋„ ๋ฉ๋‹ˆ๋‹ค.

ํŒ

๋” ๋งŽ์€ ํŠœํ† ๋ฆฌ์–ผ

MongoDB Kafka Connector ํŠœํ† ๋ฆฌ์–ผ์„ ๋” ์™„๋ฃŒํ•˜๋ ค๋Š” ๊ฒฝ์šฐ ์ปจํ…Œ์ด๋„ˆ๋งŒ ์ œ๊ฑฐํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค. MongoDB Kafka Connector ํŠœํ† ๋ฆฌ์–ผ์„ ๋” ์ด์ƒ ์™„๋ฃŒํ•˜์ง€ ์•Š์œผ๋ ค๋ฉด ์ปจํ…Œ์ด๋„ˆ ๋ฐ ์ด๋ฏธ์ง€๋ฅผ ์ œ๊ฑฐํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค.

์‹คํ–‰ํ•  ์ œ๊ฑฐ ์ž‘์—…์— ํ•ด๋‹นํ•˜๋Š” ํƒญ์„ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์…ธ ๋ช…๋ น์„ ์‹คํ–‰ํ•˜์—ฌ ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์— ๋Œ€ํ•œ Docker ์ปจํ…Œ์ด๋„ˆ์™€ ์ด๋ฏธ์ง€๋ฅผ ์ œ๊ฑฐํ•ฉ๋‹ˆ๋‹ค.

docker-compose -p mongo-kafka down --rmi all

๋‹ค์Œ shell ๋ช…๋ น์„ ์‹คํ–‰ํ•˜์—ฌ Docker ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ œ๊ฑฐํ•˜๊ณ  ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์šฉ ์ด๋ฏธ์ง€๋Š” ์œ ์ง€ํ•ฉ๋‹ˆ๋‹ค.

docker-compose -p mongo-kafka down

์ปจํ…Œ์ด๋„ˆ๋ฅผ ๋‹ค์‹œ ์‹œ์ž‘ํ•˜๋ ค๋ฉด ํŠœํ† ๋ฆฌ์–ผ ์„ค์ •์—์„œ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์‹œ์ž‘ํ•˜๋Š” ๋ฐ ํ•„์š”ํ•œ ๊ฒƒ๊ณผ ๋™์ผํ•œ ๋‹จ๊ณ„๋ฅผ ๋”ฐ๋ฅด์„ธ์š”.

์ด ํŠœํ† ๋ฆฌ์–ผ์—์„œ๋Š” MongoDB์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•˜๊ณ  ์ถœ๋ ฅ์„ ๊ด€์ฐฐํ–ˆ์Šต๋‹ˆ๋‹ค. MongoDB Kafka ์†Œ์Šค connector ๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ๊ตฌ์„ฑํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์—์„œ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ์ฝ๊ณ  ์ด๋ฅผ Kafka ์ฃผ์ œ์— ์”๋‹ˆ๋‹ค.

์†Œ์Šค ์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ๋ฐ ์ฃผ์ œ๋ฅผ ๊ตฌ์„ฑํ•˜๋Š” Kafka connector ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด๋ ค๋ฉด MongoDB Kafka ์†Œ์Šค ์‹œ์ž‘ํ•˜๊ธฐ connector ํŠœํ† ๋ฆฌ์–ผ์„ ์ฐธ์กฐํ•˜์„ธ์š”.

์ด ํŠœํ† ๋ฆฌ์–ผ์—์„œ ์–ธ๊ธ‰๋œ ๊ฐœ๋…์— ๋Œ€ํ•ด ์ž์„ธํžˆ ์•Œ์•„๋ณด๋ ค๋ฉด ๋‹ค์Œ ๋ฆฌ์†Œ์Šค๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

  • Change Streams ๋ฐ ์†Œ์Šค connector

  • ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ ์ˆ˜์ •

  • MongoDB Shell(๋ชฝ๊ณ ์‹œ)

๋Œ์•„๊ฐ€๊ธฐ

ํŠœํ† ๋ฆฌ์–ผ ์„ค์ •