Change Stream ์ด๊ธฐ
์ด ํ์ด์ง์ ๋ด์ฉ
๊ฐ์
์ด ๊ฐ์ด๋์์๋ change stream์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ค์๊ฐ ๋ณ๊ฒฝ ์ฌํญ์ ๋ชจ๋ํฐ๋งํ๋ ๋ฐฉ๋ฒ์ ๋ฐฐ์ธ ์ ์์ต๋๋ค. change stream์ ์ ํ๋ฆฌ์ผ์ด์ ์ด ๋จ์ผ collection, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋๋ ๋ฐฐํฌ์์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์ฌํญ์ ๊ตฌ๋ ํ ์ ์๋๋ก ํ๋ MongoDB Server ๊ธฐ๋ฅ์ ๋๋ค.
์ ๊ทธ๋ฆฌ๊ฒ์ด์ ์ฐ์ฐ์ ์ธํธ๋ฅผ ์ง์ ํ์ฌ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์์ ํ๋ ๋ฐ์ดํฐ๋ฅผ ํํฐ๋งํ๊ณ ๋ณํํ ์ ์์ต๋๋ค. MongoDB deployment v6.0 ์ด์์ ์ฐ๊ฒฐํ ๋ ๋ณ๊ฒฝ ์ ํ์ ๋ฌธ์ ๋ฐ์ดํฐ๋ฅผ ํฌํจํ๋๋ก ์ด๋ฒคํธ๋ฅผ ๊ตฌ์ฑํ ์๋ ์์ต๋๋ค.
๋ค์ ์น์ ์์ change stream์ ์ด๊ณ ๊ตฌ์ฑํ๋ ๋ฐฉ๋ฒ์ ์์๋ณด์ธ์.
Change Stream ์ด๊ธฐ
change stream์ ์ด์ด ํน์ ์ ํ์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์ฌํญ์ ๊ตฌ๋ ํ๊ณ ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ์์ฑํ ์ ์์ต๋๋ค.
change stream์ ์ด๋ ค๋ฉด watch()
MongoCollection
MongoDatabase
instance, ๋๋ ์ธ์คํด์ค์์ ๋ฉ์๋๋ฅผ MongoClient
ํธ์ถํฉ๋๋ค.
์ค์
๋ ๋ฆฝํ MongoDB ๋ฐฐํฌ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ง์ํ์ง ์๋๋ฐ, ์ด๋ ์ด ๊ธฐ๋ฅ์ ๋ณต์ ๋ณธ ์ธํธ oplog๊ฐ ํ์ํ๊ธฐ ๋๋ฌธ์ ๋๋ค. oplog ์ ๋ํด ์์ธํ ์์๋ณด๋ ค๋ฉด ๋ณต์ ๋ณธ ์ธํธ oplog MongoDB Server ๋งค๋ด์ผ ํ์ด์ง๋ฅผ ์ฐธ์กฐํ์ธ์.
watch()
๋ฉ์๋๋ฅผ ํธ์ถํ๋ ๊ฐ์ฒด์ ๋ฐ๋ผ change stream์ด ์์ ํ๋ ์ด๋ฒคํธ์ ๋ฒ์๊ฐ ๊ฒฐ์ ๋ฉ๋๋ค.
MongoCollection
์์ watch()
๋ฅผ ํธ์ถํ๋ฉด change stream์ด collection์ ๋ชจ๋ํฐ๋งํฉ๋๋ค.
MongoDatabase
์์ watch()
๋ฅผ ํธ์ถํ๋ฉด change stream์ด ํด๋น ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ชจ๋ collection์ ๋ชจ๋ํฐ๋งํฉ๋๋ค.
MongoClient
์์ watch()
๋ฅผ ํธ์ถํ๋ฉด change stream์ ์ฐ๊ฒฐ๋ MongoDB์ ๋ชจ๋ ๋ณ๊ฒฝ ์ฌํญ์ ๋ชจ๋ํฐ๋งํฉ๋๋ค.
์์
์ด ์์ ์์๋ myColl
collection์์ change stream์ ์ด๊ณ change stream ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ๋ ์ถ๋ ฅํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ ์ค๋๋ค.
๋๋ผ์ด๋ฒ๋ ChangeStreamIterable
์ ํ์ ๋ณ์์ change stream ์ด๋ฒคํธ๋ฅผ ์ ์ฅํฉ๋๋ค. ๋ค์ ์์ ์์๋ ๋๋ผ์ด๋ฒ๊ฐ ChangeStreamIterable
๊ฐ์ฒด๋ฅผ Document
์ ํ์ผ๋ก ์ฑ์์ผ ํ๋ค๊ณ ์ง์ ํฉ๋๋ค. ๊ฒฐ๊ณผ์ ์ผ๋ก ๋๋ผ์ด๋ฒ๋ ๊ฐ๋ณ change stream ์ด๋ฒคํธ๋ฅผ ChangeStreamDocument
๊ฐ์ฒด๋ก ์ ์ฅํฉ๋๋ค.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
collection์ ๋ํ ์ฝ์ ์์ ์ ๋ค์๊ณผ ๊ฐ์ ์ถ๋ ฅ์ ์์ฑํฉ๋๋ค.
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
์คํ ๊ฐ๋ฅํ ์์ ๋ Watch for Changes ์ฌ์ฉ ์์ ํ์ด์ง๋ฅผ ์ฐธ์กฐํ์ธ์.
watch()
๋ฉ์๋์ ๋ํด ์์ธํ ์์๋ณด๋ ค๋ฉด ๋ค์ API ๋ฌธ์๋ฅผ ์ฐธ์กฐํ์ธ์.
change stream์ ์ ๊ทธ๋ฆฌ๊ฒ์ด์ ์ฐ์ฐ์ ์ ์ฉ
์ง๊ณ ํ์ดํ๋ผ์ธ์ watch()
๋ฉ์๋์ ๋งค๊ฐ๋ณ์๋ก ์ ๋ฌํ์ฌ change stream์ด ์์ ํ ์ด๋ฒคํธ๋ฅผ ์ง์ ํ ์ ์์ต๋๋ค.
์ฌ์ฉ ์ค์ธ MongoDB Server ๋ฒ์ ์ด ์ง์ํ๋ ์ ๊ทธ๋ฆฌ๊ฒ์ด์ ์ฐ์ฐ์๋ฅผ ์์๋ณด๋ ค๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ ์์ ์ ์ฐธ์กฐํ์ธ์.
์์
๋ค์ ์ฝ๋ ์์ ์์๋ ์ง๊ณ ํ์ดํ๋ผ์ธ์ ์ ์ฉํ์ฌ ์ฝ์ ๋ฐ ์ ๋ฐ์ดํธ ์์ ์ ๋ํด์๋ง ์ด๋ฒคํธ๋ฅผ ์์ ํ๋๋ก change stream์ ๊ตฌ์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ ์ค๋๋ค.
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
collection์ ๋ํ ์ ๋ฐ์ดํธ ์์ ์ ๋ค์๊ณผ ๊ฐ์ ์ถ๋ ฅ์ ์์ฑํฉ๋๋ค.
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
๋๊ท๋ชจ change stream ์ด๋ฒคํธ ๋ถํ
MongoDB 7.0๋ถํฐ๋ $changeStreamSplitLargeEvent
์ ๊ทธ๋ฆฌ๊ฒ์ด์
๋จ๊ณ๋ฅผ ์ฌ์ฉํ์ฌ 16MB๋ฅผ ์ด๊ณผํ๋ ์ด๋ฒคํธ๋ฅผ ๋ ์์ ์กฐ๊ฐ์ผ๋ก ๋ถํ ํ ์ ์์ต๋๋ค.
๊ผญ ํ์ํ ๊ฒฝ์ฐ์๋ง $changeStreamSplitLargeEvent
๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ์ ํ๋ฆฌ์ผ์ด์
์ ์ ์ฒด ๋ฌธ์ ์ฌ์ ๋๋ ์ฌํ ์ด๋ฏธ์ง๊ฐ ํ์ํ๊ณ 16MB๋ฅผ ์ด๊ณผํ๋ ์ด๋ฒคํธ๋ฅผ ์์ฑํ๋ ๊ฒฝ์ฐ $changeStreamSplitLargeEvent
๋ฅผ ์ฌ์ฉํฉ๋๋ค.
$changeStreamSplitLargeEvent ๋จ๊ณ๋ ํ๋๊ทธ๋จผํธ๋ฅผ ์์ฐจ์ ์ผ๋ก ๋ฐํํฉ๋๋ค. change stream ์ปค์๋ฅผ ์ฌ์ฉํ์ฌ ํ๋๊ทธ๋จผํธ์ ์ก์ธ์คํ ์ ์์ต๋๋ค. ๊ฐ ํ๋๊ทธ๋จผํธ์๋ ๋ค์ ํ๋๋ฅผ ํฌํจํ๋ SplitEvent
๊ฐ์ฒด๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
ํ๋ | ์ค๋ช
|
---|---|
fragment | ์์ ์์ํ๋ ํ๋๊ทธ๋จผํธ์ ์ธ๋ฑ์ค 1 |
of | ๋ถํ ์ด๋ฒคํธ๋ฅผ ๊ตฌ์ฑํ๋ ์ด ํ๋๊ทธ๋จผํธ ์ |
๋ค์ ์์์๋ $changeStreamSplitLargeEvent
์ ๊ทธ๋ฆฌ๊ฒ์ด์
๋จ๊ณ๋ฅผ ์ฌ์ฉํ์ฌ ๋๊ท๋ชจ ์ด๋ฒคํธ๋ฅผ splitํ์ฌ change stream์ ์์ ํฉ๋๋ค.
ChangeStreamIterable<Document> changeStream = collection.watch( Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
์ฐธ๊ณ
์ง๊ณ ํ์ดํ๋ผ์ธ์๋ $changeStreamSplitLargeEvent
๋จ๊ณ๊ฐ ํ๋๋ง ์์ ์ ์์ผ๋ฉฐ ์ด ๋จ๊ณ๊ฐ ํ์ดํ๋ผ์ธ์ ๋ง์ง๋ง ๋จ๊ณ์ฌ์ผ ํฉ๋๋ค.
๋ค์ ์์ ๊ฐ์ด change stream ์ปค์์์ getSplitEvent()
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ SplitEvent
์ ์ก์ธ์คํ ์ ์์ต๋๋ค.
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
$changeStreamSplitLargeEvent
์ ๊ทธ๋ฆฌ๊ฒ์ด์
๋จ๊ณ์ ๋ํ ์์ธํ ๋ด์ฉ์ $changeStreamSplitLargeEvent ์๋ฒ ์ค๋ช
์๋ฅผ ์ฐธ์กฐํ์ธ์.
์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง ํฌํจํ๊ธฐ
๋ค์ ๋ฐ์ดํฐ๋ฅผ ํฌํจํ๊ฑฐ๋ ์๋ตํ๋๋ก ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ๊ตฌ์ฑํ ์ ์์ต๋๋ค.
์ฌ์ ์ด๋ฏธ์ง ์์ ์ ์ ๋ฌธ์์ ๋ฒ์ ์ ๋ํ๋ด๋ ๋ฌธ์(์๋ ๊ฒฝ์ฐ)
์ฌํ ์ด๋ฏธ์ง ์์ ํ์ ๋ฌธ์์ ๋ฒ์ ์ ๋ํ๋ด๋ ๋ฌธ์(์๋ ๊ฒฝ์ฐ)
์ค์
๋ฐฐํฌ์์ MongoDB v6.0 ์ด์์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ์๋ง collection์์ ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํ ์ ์์ต๋๋ค.
์ฌ์ ์ด๋ฏธ์ง ๋๋ ์ฌํ ์ด๋ฏธ์ง๊ฐ ํฌํจ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ๋ฅผ ์์ ํ๋ ค๋ฉด ๋ค์ ์์ ์ ์ํํด์ผ ํฉ๋๋ค.
MongoDB deployment์์ collection์ ๋ํ ์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํฉ๋๋ค.
ํ
๋ฐฐํฌ์์ ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํ๋ ๋ฐฉ๋ฒ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์๋ฒ ์ค๋ช ์์ Change Streams with Document Pre- and Post-Images(์ด๋ฏธ์ง ์ฌ์ ๋ฐ ์ฌํ ๋ฌธ์ํ์ Change Streams)๋ฅผ ์ฐธ์กฐํ์ธ์.
์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง๊ฐ ํ์ฑํ๋ ์ปฌ๋ ์ ์ ์์ฑํ๋๋ก ๋๋ผ์ด๋ฒ์ ์ง์ํ๋ ๋ฐฉ๋ฒ์ ์์๋ณด๋ ค๋ฉด ์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง๊ฐ ํ์ฑํ ๋ ์ปฌ๋ ์ ์์ฑ ์น์ ์ ์ฐธ์กฐํ์ธ์.
์ฌ์ ์ด๋ฏธ์ง์ ์ฌํ ์ด๋ฏธ์ง ์ค ํ๋ ๋๋ ๋ ๋ค๋ฅผ ๊ฒ์ํ๋๋ก change stream์ ๊ตฌ์ฑํฉ๋๋ค.
ํ
๋ณ๊ฒฝ ์ด๋ฒคํธ์ ์ฌ์ ์ด๋ฏธ์ง๋ฅผ ๊ธฐ๋กํ๋๋ก ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ๋ ค๋ฉด ์ฌ์ ์ด๋ฏธ์ง ๊ตฌ์ฑ ์์๋ฅผ ์ฐธ์กฐํ์ธ์.
๋ณ๊ฒฝ ์ด๋ฒคํธ์ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ๊ธฐ๋กํ๋๋ก ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ๋ ค๋ฉด ์ฌํ ์ด๋ฏธ์ง ๊ตฌ์ฑ ์์๋ฅผ ์ฐธ์กฐํ์ธ์.
์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํ์ฌ collection ๋ง๋ค๊ธฐ
๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํ์ฌ ์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง ์ต์
์ด ํ์ฑํ๋ collection์ ์์ฑํ๋ ค๋ฉด ChangeStreamPreAndPostImagesOptions
createCollection()
๋ค์ ์์ ๊ฐ์ด ์ธ์คํด์ค๋ฅผ ์ง์ ํ๊ณ ๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค.
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
MongoDB Shell์์ collMod
๋ช
๋ น์ ์คํํ์ฌ ๊ธฐ์กด ์ปฌ๋ ์
์ ์ฌ์ ์ด๋ฏธ์ง ๋ฐ ์ฌํ ์ด๋ฏธ์ง ์ต์
์ ๋ณ๊ฒฝํ ์ ์์ต๋๋ค. ์ด ์์
์ ์ํํ๋ ๋ฐฉ๋ฒ์ ์์๋ณด๋ ค๋ฉด MongoDB Server ๋งค๋ด์ผ์ collMod ํญ๋ชฉ์ ์ฐธ์กฐํ์ธ์.
๊ฒฝ๊ณ
collection์์ ์ฌ์ ์ด๋ฏธ์ง ๋๋ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํ ๊ฒฝ์ฐ collMod
์ผ)๋ก ์ด๋ฌํ ์ค์ ์ ์์ ํ๋ฉด ํด๋น collection์ ๊ธฐ์กด change stream์ด ์คํจํ ์ ์์ต๋๋ค.
์ฌ์ ์ด๋ฏธ์ง ๊ตฌ์ฑ ์์
๋ค์ ์ฝ๋ ์์์์๋ ์ฌ์ myColl
์ด๋ฏธ์ง๋ฅผ ํฌํจํ๊ณ ์ด๋ฒคํธ๋ฅผ ์ถ๋ ฅํ๋๋ก collection์์ change stream์ ๊ตฌ์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ ์ค๋๋ค.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
์์ ์์์๋ FullDocumentBeforeChange.REQUIRED
์ต์
์ ์ฌ์ฉํ๋๋ก change stream์ ๊ตฌ์ฑํฉ๋๋ค. ์ด ์ต์
์ ๊ต์ฒด, ์
๋ฐ์ดํธ ๋ฐ ์ญ์ ์ด๋ฒคํธ์ ๋ํด ์ฌ์ ์ด๋ฏธ์ง๋ฅผ ์๊ตฌํ๋๋ก change stream์ ๊ตฌ์ฑํฉ๋๋ค. ์ฌ์ ์ด๋ฏธ์ง๋ฅผ ์ฌ์ฉํ ์ ์๋ ๊ฒฝ์ฐ ๋๋ผ์ด๋ฒ์์ ์ค๋ฅ๊ฐ ๋ฐ์ํฉ๋๋ค.
๋ฌธ์์ amount
ํ๋ ๊ฐ์ 150
์์ 2000
๋ก ์
๋ฐ์ดํธํ๋ค๊ณ ๊ฐ์ ํด ๋ณด๊ฒ ์ต๋๋ค. ์ด ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ ๋ค์๊ณผ ๊ฐ์ ์ถ๋ ฅ์ ์์ฑํฉ๋๋ค.
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
์ต์ ๋ชฉ๋ก์ FullDocumentBeforeChange ๋ฅผ ์ฐธ์กฐํ์ธ์. API ๋ฌธ์.
์ฌํ ์ด๋ฏธ์ง ๊ตฌ์ฑ ์์
๋ค์ ์ฝ๋ ์์์์๋ ์ฌ์ myColl
์ด๋ฏธ์ง๋ฅผ ํฌํจํ๊ณ ์ด๋ฒคํธ๋ฅผ ์ถ๋ ฅํ๋๋ก collection์์ change stream์ ๊ตฌ์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ ์ค๋๋ค.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
์์ ์์์๋ FullDocument.WHEN_AVAILABLE
์ต์
์ ์ฌ์ฉํ๋๋ก change stream์ ๊ตฌ์ฑํฉ๋๋ค. ์ด ์ต์
์ ์ฌ์ฉ ๊ฐ๋ฅํ ๊ฒฝ์ฐ ๊ต์ฒด ๋ฐ ์
๋ฐ์ดํธ ์ด๋ฒคํธ์ ๋ํด ์์ ๋ ๋ฌธ์์ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ๋ฐํํ๋๋ก change stream์ ๊ตฌ์ฑํฉ๋๋ค.
๋ฌธ์์ color
ํ๋ ๊ฐ์ "purple"
์์ "pink"
๋ก ์
๋ฐ์ดํธํ๋ค๊ณ ๊ฐ์ ํด ๋ณด๊ฒ ์ต๋๋ค. ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ ๋ค์๊ณผ ๊ฐ์ ์ถ๋ ฅ์ ์์ฑํฉ๋๋ค.
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
์ต์ ๋ชฉ๋ก์ FullDocument ๋ฅผ ์ฐธ์กฐํ์ธ์. API ๋ฌธ์.