Docs Menu
Docs Home
/ / /
Java ๋™๊ธฐํ™” ๋“œ๋ผ์ด๋ฒ„
/ / /

Change Stream ์—ด๊ธฐ

์ด ํŽ˜์ด์ง€์˜ ๋‚ด์šฉ

  • ๊ฐœ์š”
  • Change Stream ์—ด๊ธฐ
  • change stream์— ์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ์—ฐ์‚ฐ์ž ์ ์šฉ
  • ๋Œ€๊ทœ๋ชจ change stream ์ด๋ฒคํŠธ ๋ถ„ํ• 
  • ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€ ํฌํ•จํ•˜๊ธฐ

์ด ๊ฐ€์ด๋“œ์—์„œ๋Š” change stream์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์‹ค์‹œ๊ฐ„ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ฐฐ์šธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. change stream์€ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ๋‹จ์ผ collection, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋˜๋Š” ๋ฐฐํฌ์—์„œ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๊ตฌ๋…ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” MongoDB Server ๊ธฐ๋Šฅ์ž…๋‹ˆ๋‹ค.

์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ์—ฐ์‚ฐ์ž ์„ธํŠธ๋ฅผ ์ง€์ •ํ•˜์—ฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์ˆ˜์‹ ํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ํ•„ํ„ฐ๋งํ•˜๊ณ  ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. MongoDB deployment v6.0 ์ด์ƒ์— ์—ฐ๊ฒฐํ•  ๋•Œ ๋ณ€๊ฒฝ ์ „ํ›„์˜ ๋ฌธ์„œ ๋ฐ์ดํ„ฐ๋ฅผ ํฌํ•จํ•˜๋„๋ก ์ด๋ฒคํŠธ๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์„น์…˜์—์„œ change stream์„ ์—ด๊ณ  ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด์„ธ์š”.

  • Change Stream ์—ด๊ธฐ

  • change stream์— ์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ์—ฐ์‚ฐ์ž ์ ์šฉ

  • ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€ ํฌํ•จํ•˜๊ธฐ

change stream์„ ์—ด์–ด ํŠน์ • ์œ ํ˜•์˜ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๊ตฌ๋…ํ•˜๊ณ  ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

change stream์„ ์—ด๋ ค๋ฉด watch() MongoCollectionMongoDatabaseinstance, ๋˜๋Š” ์ธ์Šคํ„ด์Šค์—์„œ ๋ฉ”์„œ๋“œ๋ฅผ MongoClient ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

์ค‘์š”

๋…๋ฆฝํ˜• MongoDB ๋ฐฐํฌ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์ง€์›ํ•˜์ง€ ์•Š๋Š”๋ฐ, ์ด๋Š” ์ด ๊ธฐ๋Šฅ์— ๋ณต์ œ๋ณธ ์„ธํŠธ oplog๊ฐ€ ํ•„์š”ํ•˜๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค. oplog ์— ๋Œ€ํ•ด ์ž์„ธํžˆ ์•Œ์•„๋ณด๋ ค๋ฉด ๋ณต์ œ๋ณธ ์„ธํŠธ oplog MongoDB Server ๋งค๋‰ด์–ผ ํŽ˜์ด์ง€๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

watch() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ๊ฐ์ฒด์— ๋”ฐ๋ผ change stream์ด ์ˆ˜์‹ ํ•˜๋Š” ์ด๋ฒคํŠธ์˜ ๋ฒ”์œ„๊ฐ€ ๊ฒฐ์ •๋ฉ๋‹ˆ๋‹ค.

MongoCollection ์—์„œ watch() ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด change stream์ด collection์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค.

MongoDatabase ์—์„œ watch() ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด change stream์ด ํ•ด๋‹น ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ๋ชจ๋“  collection์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค.

MongoClient์—์„œ watch() ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด change stream์€ ์—ฐ๊ฒฐ๋œ MongoDB์˜ ๋ชจ๋“  ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค.

์ด ์˜ˆ์ œ์—์„œ๋Š” myColl collection์—์„œ change stream์„ ์—ด๊ณ  change stream ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•  ๋•Œ ์ถœ๋ ฅํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ ์ค๋‹ˆ๋‹ค.

๋“œ๋ผ์ด๋ฒ„๋Š” ChangeStreamIterable ์œ ํ˜•์˜ ๋ณ€์ˆ˜์— change stream ์ด๋ฒคํŠธ๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค. ๋‹ค์Œ ์˜ˆ์ œ์—์„œ๋Š” ๋“œ๋ผ์ด๋ฒ„๊ฐ€ ChangeStreamIterable ๊ฐ์ฒด๋ฅผ Document ์œ ํ˜•์œผ๋กœ ์ฑ„์›Œ์•ผ ํ•œ๋‹ค๊ณ  ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค. ๊ฒฐ๊ณผ์ ์œผ๋กœ ๋“œ๋ผ์ด๋ฒ„๋Š” ๊ฐœ๋ณ„ change stream ์ด๋ฒคํŠธ๋ฅผ ChangeStreamDocument ๊ฐ์ฒด๋กœ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

collection์— ๋Œ€ํ•œ ์‚ฝ์ž… ์ž‘์—…์€ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ถœ๋ ฅ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

์‹คํ–‰ ๊ฐ€๋Šฅํ•œ ์˜ˆ์ œ๋Š” Watch for Changes ์‚ฌ์šฉ ์˜ˆ์ œ ํŽ˜์ด์ง€๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

watch() ๋ฉ”์„œ๋“œ์— ๋Œ€ํ•ด ์ž์„ธํžˆ ์•Œ์•„๋ณด๋ ค๋ฉด ๋‹ค์Œ API ๋ฌธ์„œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ์„ watch() ๋ฉ”์„œ๋“œ์— ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ „๋‹ฌํ•˜์—ฌ change stream์ด ์ˆ˜์‹ ํ•  ์ด๋ฒคํŠธ๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์‚ฌ์šฉ ์ค‘์ธ MongoDB Server ๋ฒ„์ „์ด ์ง€์›ํ•˜๋Š” ์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ์—ฐ์‚ฐ์ž๋ฅผ ์•Œ์•„๋ณด๋ ค๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ ์ˆ˜์ •์„ ์ฐธ์กฐํ•˜์„ธ์š”.

๋‹ค์Œ ์ฝ”๋“œ ์˜ˆ์ œ์—์„œ๋Š” ์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ ์šฉํ•˜์—ฌ ์‚ฝ์ž… ๋ฐ ์—…๋ฐ์ดํŠธ ์ž‘์—…์— ๋Œ€ํ•ด์„œ๋งŒ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๋„๋ก change stream์„ ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ ์ค๋‹ˆ๋‹ค.

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

collection์— ๋Œ€ํ•œ ์—…๋ฐ์ดํŠธ ์ž‘์—…์€ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ถœ๋ ฅ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

MongoDB 7.0๋ถ€ํ„ฐ๋Š” $changeStreamSplitLargeEvent ์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ๋‹จ๊ณ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ 16MB๋ฅผ ์ดˆ๊ณผํ•˜๋Š” ์ด๋ฒคํŠธ๋ฅผ ๋” ์ž‘์€ ์กฐ๊ฐ์œผ๋กœ ๋ถ„ํ• ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ผญ ํ•„์š”ํ•œ ๊ฒฝ์šฐ์—๋งŒ $changeStreamSplitLargeEvent ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ์ „์ฒด ๋ฌธ์„œ ์‚ฌ์ „ ๋˜๋Š” ์‚ฌํ›„ ์ด๋ฏธ์ง€๊ฐ€ ํ•„์š”ํ•˜๊ณ  16MB๋ฅผ ์ดˆ๊ณผํ•˜๋Š” ์ด๋ฒคํŠธ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๊ฒฝ์šฐ $changeStreamSplitLargeEvent ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

$changeStreamSplitLargeEvent ๋‹จ๊ณ„๋Š” ํ”„๋ž˜๊ทธ๋จผํŠธ๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. change stream ์ปค์„œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ”„๋ž˜๊ทธ๋จผํŠธ์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐ ํ”„๋ž˜๊ทธ๋จผํŠธ์—๋Š” ๋‹ค์Œ ํ•„๋“œ๋ฅผ ํฌํ•จํ•˜๋Š” SplitEvent ๊ฐ์ฒด๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

ํ•„๋“œ
์„ค๋ช…
fragment
์—์„œ ์‹œ์ž‘ํ•˜๋Š” ํ”„๋ž˜๊ทธ๋จผํŠธ์˜ ์ธ๋ฑ์Šค 1
of
๋ถ„ํ•  ์ด๋ฒคํŠธ๋ฅผ ๊ตฌ์„ฑํ•˜๋Š” ์ด ํ”„๋ž˜๊ทธ๋จผํŠธ ์ˆ˜

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” $changeStreamSplitLargeEvent ์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ๋‹จ๊ณ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋Œ€๊ทœ๋ชจ ์ด๋ฒคํŠธ๋ฅผ splitํ•˜์—ฌ change stream์„ ์ˆ˜์ •ํ•ฉ๋‹ˆ๋‹ค.

ChangeStreamIterable<Document> changeStream = collection.watch(
Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

์ฐธ๊ณ 

์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ์—๋Š” $changeStreamSplitLargeEvent ๋‹จ๊ณ„๊ฐ€ ํ•˜๋‚˜๋งŒ ์žˆ์„ ์ˆ˜ ์žˆ์œผ๋ฉฐ ์ด ๋‹จ๊ณ„๊ฐ€ ํŒŒ์ดํ”„๋ผ์ธ์˜ ๋งˆ์ง€๋ง‰ ๋‹จ๊ณ„์—ฌ์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์™€ ๊ฐ™์ด change stream ์ปค์„œ์—์„œ getSplitEvent() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ SplitEvent ์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

$changeStreamSplitLargeEvent ์• ๊ทธ๋ฆฌ๊ฒŒ์ด์…˜ ๋‹จ๊ณ„์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ $changeStreamSplitLargeEvent ์„œ๋ฒ„ ์„ค๋ช…์„œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

๋‹ค์Œ ๋ฐ์ดํ„ฐ๋ฅผ ํฌํ•จํ•˜๊ฑฐ๋‚˜ ์ƒ๋žตํ•˜๋„๋ก ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์‚ฌ์ „ ์ด๋ฏธ์ง€ ์ž‘์—… ์ „์˜ ๋ฌธ์„œ์˜ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฌธ์„œ(์žˆ๋Š” ๊ฒฝ์šฐ)

  • ์‚ฌํ›„ ์ด๋ฏธ์ง€ ์ž‘์—… ํ›„์˜ ๋ฌธ์„œ์˜ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฌธ์„œ(์žˆ๋Š” ๊ฒฝ์šฐ)

์ค‘์š”

๋ฐฐํฌ์—์„œ MongoDB v6.0 ์ด์ƒ์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ์—๋งŒ collection์—์„œ ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ํ™œ์„ฑํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋˜๋Š” ์‚ฌํ›„ ์ด๋ฏธ์ง€๊ฐ€ ํฌํ•จ๋œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๋ ค๋ฉด ๋‹ค์Œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • MongoDB deployment์—์„œ collection์— ๋Œ€ํ•œ ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ํ™œ์„ฑํ™”ํ•ฉ๋‹ˆ๋‹ค.

    ํŒ

    ๋ฐฐํฌ์—์„œ ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ํ™œ์„ฑํ™”ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์„œ๋ฒ„ ์„ค๋ช…์„œ์˜ Change Streams with Document Pre- and Post-Images(์ด๋ฏธ์ง€ ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ๋ฌธ์„œํ™”์˜ Change Streams)๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

    ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๊ฐ€ ํ™œ์„ฑํ™”๋œ ์ปฌ๋ ‰์…˜์„ ์ƒ์„ฑํ•˜๋„๋ก ๋“œ๋ผ์ด๋ฒ„์— ์ง€์‹œํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด๋ ค๋ฉด ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๊ฐ€ ํ™œ์„ฑํ™” ๋œ ์ปฌ๋ ‰์…˜ ์ƒ์„ฑ ์„น์…˜์„ ์ฐธ์กฐํ•˜์„ธ์š”.

  • ์‚ฌ์ „ ์ด๋ฏธ์ง€์™€ ์‚ฌํ›„ ์ด๋ฏธ์ง€ ์ค‘ ํ•˜๋‚˜ ๋˜๋Š” ๋‘˜ ๋‹ค๋ฅผ ๊ฒ€์ƒ‰ํ•˜๋„๋ก change stream์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

    ํŒ

    ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ์— ์‚ฌ์ „ ์ด๋ฏธ์ง€๋ฅผ ๊ธฐ๋กํ•˜๋„๋ก ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•˜๋ ค๋ฉด ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๊ตฌ์„ฑ ์˜ˆ์‹œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

    ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ์— ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ๊ธฐ๋กํ•˜๋„๋ก ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•˜๋ ค๋ฉด ์‚ฌํ›„ ์ด๋ฏธ์ง€ ๊ตฌ์„ฑ ์˜ˆ์‹œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€ ์˜ต์…˜์ด ํ™œ์„ฑํ™”๋œ collection์„ ์ƒ์„ฑํ•˜๋ ค๋ฉด ChangeStreamPreAndPostImagesOptions createCollection() ๋‹ค์Œ ์˜ˆ์™€ ๊ฐ™์ด ์ธ์Šคํ„ด์Šค๋ฅผ ์ง€์ •ํ•˜๊ณ  ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

MongoDB Shell์—์„œ collMod ๋ช…๋ น์„ ์‹คํ–‰ํ•˜์—ฌ ๊ธฐ์กด ์ปฌ๋ ‰์…˜์˜ ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€ ์˜ต์…˜์„ ๋ณ€๊ฒฝํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด๋ ค๋ฉด MongoDB Server ๋งค๋‰ด์–ผ์˜ collMod ํ•ญ๋ชฉ์„ ์ฐธ์กฐํ•˜์„ธ์š”.

๊ฒฝ๊ณ 

collection์—์„œ ์‚ฌ์ „ ์ด๋ฏธ์ง€ ๋˜๋Š” ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ํ™œ์„ฑํ™”ํ•œ ๊ฒฝ์šฐ collMod ์œผ)๋กœ ์ด๋Ÿฌํ•œ ์„ค์ •์„ ์ˆ˜์ •ํ•˜๋ฉด ํ•ด๋‹น collection์˜ ๊ธฐ์กด change stream์ด ์‹คํŒจํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์ฝ”๋“œ ์˜ˆ์‹œ์—์„œ๋Š” ์‚ฌ์ „ myColl ์ด๋ฏธ์ง€๋ฅผ ํฌํ•จํ•˜๊ณ  ์ด๋ฒคํŠธ๋ฅผ ์ถœ๋ ฅํ•˜๋„๋ก collection์—์„œ change stream์„ ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ ์ค๋‹ˆ๋‹ค.

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

์•ž์˜ ์˜ˆ์—์„œ๋Š” FullDocumentBeforeChange.REQUIRED ์˜ต์…˜์„ ์‚ฌ์šฉํ•˜๋„๋ก change stream์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ด ์˜ต์…˜์€ ๊ต์ฒด, ์—…๋ฐ์ดํŠธ ๋ฐ ์‚ญ์ œ ์ด๋ฒคํŠธ์— ๋Œ€ํ•ด ์‚ฌ์ „ ์ด๋ฏธ์ง€๋ฅผ ์š”๊ตฌํ•˜๋„๋ก change stream์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์‚ฌ์ „ ์ด๋ฏธ์ง€๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋Š” ๊ฒฝ์šฐ ๋“œ๋ผ์ด๋ฒ„์—์„œ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค.

๋ฌธ์„œ์˜ amount ํ•„๋“œ ๊ฐ’์„ 150 ์—์„œ 2000 ๋กœ ์—…๋ฐ์ดํŠธํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ์ด ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ถœ๋ ฅ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

์˜ต์…˜ ๋ชฉ๋ก์€ FullDocumentBeforeChange ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”. API ๋ฌธ์„œ.

๋‹ค์Œ ์ฝ”๋“œ ์˜ˆ์‹œ์—์„œ๋Š” ์‚ฌ์ „ myColl ์ด๋ฏธ์ง€๋ฅผ ํฌํ•จํ•˜๊ณ  ์ด๋ฒคํŠธ๋ฅผ ์ถœ๋ ฅํ•˜๋„๋ก collection์—์„œ change stream์„ ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ ์ค๋‹ˆ๋‹ค.

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

์•ž์˜ ์˜ˆ์—์„œ๋Š” FullDocument.WHEN_AVAILABLE ์˜ต์…˜์„ ์‚ฌ์šฉํ•˜๋„๋ก change stream์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ด ์˜ต์…˜์€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๊ฒฝ์šฐ ๊ต์ฒด ๋ฐ ์—…๋ฐ์ดํŠธ ์ด๋ฒคํŠธ์— ๋Œ€ํ•ด ์ˆ˜์ •๋œ ๋ฌธ์„œ์˜ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋„๋ก change stream์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

๋ฌธ์„œ์˜ color ํ•„๋“œ ๊ฐ’์„ "purple" ์—์„œ "pink" ๋กœ ์—…๋ฐ์ดํŠธํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ถœ๋ ฅ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

์˜ต์…˜ ๋ชฉ๋ก์€ FullDocument ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”. API ๋ฌธ์„œ.

๋Œ์•„๊ฐ€๊ธฐ

์ปค์„œ์—์„œ ๋ฐ์ดํ„ฐ ์•ก์„ธ์Šค