Docs Menu

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ํ†ตํ•ด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ์‚ฌ์ „์— ๋ณต์žกํ•œ ๋ฐฉ์‹ ๋ฐ ์ˆ˜๋™์œผ๋กœ oplog๋ฅผ ํ…Œ์ผ๋งํ•˜๋Š” ์œ„ํ—˜ ์—†์ด ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋‹จ์ผ ์ปฌ๋ ‰์…˜, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋˜๋Š” ์ „์ฒด ๋ฐฐํฌ์˜ ๋ชจ๋“  ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๊ตฌ๋…ํ•˜๊ณ  ์ด์— ์ฆ‰์‹œ ๋Œ€์‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ์ง‘๊ณ„ ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ํŠน์ • ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ํ•„ํ„ฐ๋งํ•˜๊ฑฐ๋‚˜ ์•Œ๋ฆผ์„ ๋งˆ์Œ๋Œ€๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

MongoDB 5.1๋ถ€ํ„ฐ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์ตœ์ ํ™”๋˜์–ด ๋” ํšจ์œจ์ ์ธ ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋ฅ ๊ณผ ์ผ๋ถ€ ์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„์˜ ๋” ๋น ๋ฅธ ์‹คํ–‰์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๋ณต์ œ๋ณธ ์„ธํŠธ ๋ฐ ์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค:

  • ์Šคํ† ๋ฆฌ์ง€ ์—”์ง„

    ๋ณต์ œ๋ณธ ์„ธํŠธ์™€ ์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ๋Š” WiredTiger ์Šคํ† ๋ฆฌ์ง€ ์—”์ง„์„ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. Change Stream์€ MongoDB์˜ ๋ฏธ์‚ฌ์šฉ ๋ฐ์ดํ„ฐ ์•”ํ˜ธํ™” ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฐํฌ์—์„œ๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋ณต์ œ๋ณธ ์„ธํŠธ ํ”„๋กœํ† ์ฝœ ๋ฒ„์ „.

    ๋ณต์ œ๋ณธ ์„ธํŠธ์™€ ์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ๋Š” ๋ณต์ œ๋ณธ ์„ธํŠธ ํ”„๋กœํ† ์ฝœ ๋ฒ„์ „ 1(pv1)๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • ์ฝ๊ธฐ ๊ณ ๋ ค '๋Œ€๋‹ค์ˆ˜' ํ™œ์„ฑํ™”

    ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์€ "majority" ์ฝ๊ธฐ ๊ณ ๋ ค ์ง€์›์— ๊ด€๊ณ„์—†์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ฆ‰, ์ฝ๊ธฐ ๊ณ ๋ ค majority ์ง€์›์„ ํ™œ์„ฑํ™”(๊ธฐ๋ณธ๊ฐ’)ํ•˜๊ฑฐ๋‚˜ ๋น„ํ™œ์„ฑํ™” ํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ Stable API V1 ์— ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ showExpandedEvents ์˜ต์…˜์€ Stable API V1 ์— ํฌํ•จ๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์€ +srv ์—ฐ๊ฒฐ ์˜ต์…˜๊ณผ ํ•จ๊ป˜ DNS ์‹œ๋“œ ๋ชฉ๋ก์„ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜ ์—ฐ๊ฒฐ ๋ฌธ์ž์—ด์— ์„œ๋ฒ„๋ฅผ ๊ฐœ๋ณ„์ ์œผ๋กœ ๋‚˜์—ดํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋“œ๋ผ์ด๋ฒ„๊ฐ€ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์„ ์žƒ๊ฑฐ๋‚˜ ์—ฐ๊ฒฐ์ด ๋‹ค์šด๋˜๋ฉด ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด ์ฝ๊ธฐ ์„ค์ •์ด ์ผ์น˜ํ•˜๋Š” ๋‹ค๋ฅธ ๋…ธ๋“œ๋ฅผ ํ†ตํ•ด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์„ ๋‹ค์‹œ ์„ค์ •ํ•˜๋ ค๊ณ  ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค. ๋“œ๋ผ์ด๋ฒ„๊ฐ€ ์˜ฌ๋ฐ”๋ฅธ ์ฝ๊ธฐ ์„ค์ •์„ ๊ฐ€์ง„ ๋…ธ๋“œ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์œผ๋ฉด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค.

์ž์„ธํ•œ ๋‚ด์šฉ์€ ์—ฐ๊ฒฐ ๋ฌธ์ž์—ด URI ํ˜•์‹์„ ์ฐธ์กฐํ•˜์„ธ์š”.

๋‹ค์Œ์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ด ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋Œ€์ƒ
์„ค๋ช…

์ปฌ๋ ‰์…˜

๋‹จ์ผ ์ปฌ๋ ‰์…˜(system ์ปฌ๋ ‰์…˜ ๋˜๋Š” admin, local ๋ฐ config ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ๋ชจ๋“  ์ปฌ๋ ‰์…˜ ์ œ์™ธ)์— ๋Œ€ํ•ด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋ฅผ ์—ด ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ํŽ˜์ด์ง€์˜ ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋‹จ์ผ ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•ด ์ปค์„œ๋ฅผ ์—ด๊ณ  ์ž‘์—…ํ•ฉ๋‹ˆ๋‹ค. mongosh ๋ฉ”์„œ๋“œ db.collection.watch()๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค

๋‹จ์ผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค( admin, local ๋ฐ config ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ œ์™ธ)์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋ฅผ ์—ด์–ด ๋ชจ๋“  ๋น„์‹œ์Šคํ…œ ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๊ด€์ฐฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

MongoDB ๋“œ๋ผ์ด๋ฒ„ ๋ฉ”์„œ๋“œ์— ๋Œ€ํ•ด์„œ๋Š” ๋“œ๋ผ์ด๋ฒ„ ์„ค๋ช…์„œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”. mongosh ๋ฉ”์„œ๋“œ db.watch()๋„ ์ฐธ์กฐํ•˜์„ธ์š”.

๋ฐฐํฌ

๋ฐฐํฌ(๋ณต์ œ๋ณธ ์„ธํŠธ ๋˜๋Š” ์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ)์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋ฅผ ์—ด์–ด admin, local ๋ฐ config๋ฅผ ์ œ์™ธํ•œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ „๋ฐ˜์—์„œ ๋ชจ๋“  ๋น„์‹œ์Šคํ…œ ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

MongoDB ๋“œ๋ผ์ด๋ฒ„ ๋ฉ”์„œ๋“œ์— ๋Œ€ํ•ด์„œ๋Š” ๋“œ๋ผ์ด๋ฒ„ ์„ค๋ช…์„œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”. mongosh ๋ฉ”์„œ๋“œ Mongo.watch()๋„ ์ฐธ์กฐํ•˜์„ธ์š”.

์ฐธ๊ณ 

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์˜ˆ์‹œ

์ด ํŽ˜์ด์ง€์˜ ์˜ˆ์‹œ์—์„œ๋Š” ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋ฅผ ์—ด๊ณ  ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋กœ ์ž‘์—…ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•˜๊ธฐ ์œ„ํ•ด MongoDB ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋Œ€ํ•ด ์—ด๋ ค ์žˆ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์˜ ์–‘์ด ์—ฐ๊ฒฐ ํ’€ ํฌ๊ธฐ๋ฅผ ์ดˆ๊ณผํ•  ๊ฒฝ์šฐ ์•Œ๋ฆผ ์ง€์—ฐ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๋‹ค์Œ ์ด๋ฒคํŠธ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์‹œ๊ฐ„ ๋™์•ˆ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ๊ณผ GetMore ์ž‘์—…์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ง€์—ฐ ์‹œ๊ฐ„ ๋ฌธ์ œ๋ฅผ ๋ฐฉ์ง€ํ•˜๋ ค๋ฉด ํ’€ ํฌ๊ธฐ๊ฐ€ ์—ด๋ ค ์žˆ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ˆ˜๋ณด๋‹ค ํฐ์ง€ ํ™•์ธํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ MaxPoolSize ์„ค์ •์„ ์ฐธ์กฐํ•˜์„ธ์š”.

์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์—ด๋ฆฌ๋Š” ๊ฒฝ์šฐ:

  • mongos๋Š” ๊ฐ ์ƒค๋“œ์—์„œ ๊ฐœ๋ณ„์ ์ธ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ด ๋™์ž‘์€ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ํŠน์ • ์ƒค๋“œ ํ‚ค ๋ฒ”์œ„๋ฅผ ๋Œ€์ƒ์œผ๋กœ ํ•˜๋Š”์ง€ ์—ฌ๋ถ€์— ๊ด€๊ณ„์—†์ด ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค.

  • mongos์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ๊ฒฐ๊ณผ๋ฅผ ์ˆ˜์‹ ํ•˜๋ฉด ํ•ด๋‹น ๊ฒฐ๊ณผ๊ฐ€ ์ •๋ ฌ ๋ฐ ํ•„ํ„ฐ๋ง๋ฉ๋‹ˆ๋‹ค. ํ•„์š”ํ•œ ๊ฒฝ์šฐ mongos์—์„œ fullDocument ์กฐํšŒ๋„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

์ตœ์ƒ์˜ ์„ฑ๋Šฅ์„ ์–ป์œผ๋ ค๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์—์„œ $lookup ์ฟผ๋ฆฌ ์‚ฌ์šฉ์„ ์ œํ•œํ•˜์„ธ์š”.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ด๋ ค๋ฉด ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

  • ๋ณต์ œ๋ณธ ์„ธํŠธ์˜ ๊ฒฝ์šฐ ๋ฐ์ดํ„ฐ ๋ณด์œ  ๋ฉค๋ฒ„ ์ค‘ ํ•˜๋‚˜์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์—ด๊ธฐ ์ž‘์—…์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ์˜ ๊ฒฝ์šฐ mongos์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์—ด๊ธฐ ์ž‘์—…์„ ์‹คํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์‹œ๋Š” ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ด๊ณ  ์ปค์„œ๋ฅผ ๋ฐ˜๋ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ๋ฌธ์„œ๋ฅผ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค. [1]


โžค ์˜ค๋ฅธ์ชฝ ์ƒ๋‹จ์˜ ์–ธ์–ด ์„ ํƒ ๋“œ๋กญ๋‹ค์šด ๋ฉ”๋‰ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด ํŽ˜์ด์ง€์— ์žˆ๋Š” ์˜ˆ์‹œ์˜ ์–ธ์–ด๋ฅผ ์„ค์ •ํ•˜์„ธ์š”.


์•„๋ž˜์˜ C ์˜ˆ์ œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ ์— ์—ฐ๊ฒฐํ•˜๊ณ  ์ปฌ๋ ‰์…˜ ์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  inventory ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

์•„๋ž˜์˜ C# ์˜ˆ์ œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ ์— ์—ฐ๊ฒฐํ•˜๊ณ  ์ปฌ๋ ‰์…˜ ์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  inventory ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

์•„๋ž˜ Go ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ–ˆ๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

์•„๋ž˜ Java ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ–ˆ๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šค ํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

์•„๋ž˜์˜ ์ฝ”ํ‹€๋ฆฐ (Kotlin) ์˜ˆ์ œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ ์— ์—ฐ๊ฒฐ๋˜์–ด ์žˆ๊ณ  inventory ์ปฌ๋ ‰์…˜ ์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์— ์•ก์„ธ์Šค ํ•  ์ˆ˜ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ์ž‘์—…์„ ์™„๋ฃŒํ•˜๋Š” ๋ฐฉ๋ฒ•์— ํ•™์Šต ๋ณด๋ ค๋ฉด ์ฝ”ํ‹€๋ฆฐ (Kotlin) ๋ฃจํ‹ด ๋“œ๋ผ์ด๋ฒ„ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ ์ปฌ๋ ‰์…˜ ๊ฐ€์ด๋“œ ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

์•„๋ž˜ ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ•˜๊ณ  inventory ์ปฌ๋ ‰์…˜์„ ํฌํ•จํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šค ํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

cursor = db.inventory.watch()
document = await cursor.next()

์•„๋ž˜ Node.js ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ•˜๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ๋Š” ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ์„ธ์Šคํ•˜๋Š” ์˜ˆ์‹œ์ž…๋‹ˆ๋‹ค.

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream
.on('change', next => {
// process next document
})
.once('error', () => {
// handle error
});

๋˜๋Š” ๋ฐ˜๋ณต๊ธฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ์„ธ์Šคํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream์€ EventEmitter๋ฅผ ํ™•์žฅํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ–ˆ๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

์•„๋ž˜ Python ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ–ˆ๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

cursor = db.inventory.watch()
next(cursor)

์•„๋ž˜ ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ–ˆ๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

cursor = inventory.watch.to_enum
next_change = cursor.next

์•„๋ž˜ Swift (๋น„๋™๊ธฐ) ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ•˜๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

์•„๋ž˜ Swift(๋™๊ธฐ) ์˜ˆ์‹œ์—์„œ๋Š” MongoDB ๋ณต์ œ๋ณธ ์„ธํŠธ์— ์—ฐ๊ฒฐํ•˜๊ณ  inventory ์ปฌ๋ ‰์…˜์ด ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์•ก์„ธ์Šคํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

์ปค์„œ์—์„œ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ์กฐํšŒํ•˜๋ ค๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋ฅผ ๋ฐ˜๋ณตํ•ฉ๋‹ˆ๋‹ค. ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Change Events๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋Š” ๋‹ค์Œ ์ค‘ ํ•˜๋‚˜๊ฐ€ ๋ฐœ์ƒํ•  ๋•Œ๊นŒ์ง€ ์—ด๋ ค ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์ปค์„œ๊ฐ€ ๋ช…์‹œ์ ์œผ๋กœ ๋‹ซํž™๋‹ˆ๋‹ค.

  • ๋ฌดํšจํ™” ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์ปฌ๋ ‰์…˜ ์ œ๊ฑฐ ๋˜๋Š” ์ด๋ฆ„ ๋ฐ”๊พธ๊ธฐ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

  • MongoDB ๋ฐฐํฌ์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์ด ๋‹ซํžˆ๊ฑฐ๋‚˜ ์‹œ๊ฐ„ ์ดˆ๊ณผ๋ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ Cursor Behaviors๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

  • ๋ฐฐํฌ๊ฐ€ ์ƒค๋”ฉ๋œ ํด๋Ÿฌ์Šคํ„ฐ์ธ ๊ฒฝ์šฐ ์ƒค๋“œ๋ฅผ ์ œ๊ฑฐํ•˜๋ฉด ์—ด๋ ค ์žˆ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๊ฐ€ ๋‹ซํž ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‹ซํžŒ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๋Š” ์™„์ „ํžˆ ์žฌ๊ฐœ๋˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฐธ๊ณ 

๋‹ซํžˆ์ง€ ์•Š์€ ์ปค์„œ์˜ ๋ผ์ดํ”„์‚ฌ์ดํด์€ ์–ธ์–ด์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค.

[1] startAtOperationTime์„ ์ง€์ •ํ•˜์—ฌ ํŠน์ • ์‹œ์ ์— ์ปค์„œ๋ฅผ ์—ด ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ง€์ •๋œ ์‹œ์ž‘์ ์ด ๊ณผ๊ฑฐ์ธ ๊ฒฝ์šฐ oplog์˜ ์‹œ๊ฐ„ ๋ฒ”์œ„ ๋‚ด์— ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

โžค ์˜ค๋ฅธ์ชฝ ์ƒ๋‹จ์˜ ์–ธ์–ด ์„ ํƒ ๋“œ๋กญ๋‹ค์šด ๋ฉ”๋‰ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด ํŽ˜์ด์ง€์— ์žˆ๋Š” ์˜ˆ์‹œ์˜ ์–ธ์–ด๋ฅผ ์„ค์ •ํ•˜์„ธ์š”.


๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipeline ๋ชฉ๋ก์—๋Š” ๋‹ค์Œ ๊ธฐ์ค€ ์ค‘ ํ•˜๋‚˜ ๋˜๋Š” ๋‘˜ ๋‹ค๋ฅผ ์ถฉ์กฑํ•˜๋Š” ์ž‘์—…์„ ํ•„ํ„ฐ๋งํ•˜๋Š” ๋‹จ์ผ $match ๋‹จ๊ณ„๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • username ๊ฐ’์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. alice

  • operationType ๊ฐ’์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. delete

pipeline์„ watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•˜๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์ง€์ •๋œ pipeline์„ ํ†ตํ•ด ์•Œ๋ฆผ์„ ์ „๋‹ฌํ•œ ํ›„ ์•Œ๋ฆผ์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ์ง€์‹œํ•ฉ๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

pipeline ๋ชฉ๋ก์—๋Š” ๋‹ค์Œ ๊ธฐ์ค€ ์ค‘ ํ•˜๋‚˜ ๋˜๋Š” ๋‘˜ ๋‹ค๋ฅผ ์ถฉ์กฑํ•˜๋Š” ์ž‘์—…์„ ํ•„ํ„ฐ๋งํ•˜๋Š” ๋‹จ์ผ $match ๋‹จ๊ณ„๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • username ๊ฐ’์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. alice

  • operationType ๊ฐ’์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. delete

pipeline์„ watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•˜๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์ง€์ •๋œ pipeline์„ ํ†ตํ•ด ์•Œ๋ฆผ์„ ์ „๋‹ฌํ•œ ํ›„ ์•Œ๋ฆผ์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ์ง€์‹œํ•ฉ๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ๋Š” ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ์„ธ์Šคํ•˜๋Š” ์˜ˆ์‹œ์ž…๋‹ˆ๋‹ค.

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});

๋˜๋Š” ๋ฐ˜๋ณต๊ธฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ์„ธ์Šคํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋‹ค์Œ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ฐฐ์—ด์„ ์ œ๊ณตํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

ํŒ

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ ๋ฌธ์„œ์˜ _id ํ•„๋“œ๋Š” ์žฌ๊ฐœ ํ† ํฐ ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ์˜ _id ํ•„๋“œ๋ฅผ ์ˆ˜์ •ํ•˜๊ฑฐ๋‚˜ ์ œ๊ฑฐํ•˜๊ธฐ ์œ„ํ•ด ํŒŒ์ดํ”„๋ผ์ธ์„ ์‚ฌ์šฉํ•˜์ง€ ๋งˆ์„ธ์š”.

MongoDB 4.2๋ถ€ํ„ฐ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ์ด ์ด๋ฒคํŠธ์˜ _id ํ•„๋“œ๋ฅผ ์ˆ˜์ •ํ•˜๋Š” ๊ฒฝ์šฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ต๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์‘๋‹ต ๋ฌธ์„œ ํ˜•์‹์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Change Events๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

๊ธฐ๋ณธ์ ์œผ๋กœ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ์—…๋ฐ์ดํŠธ ์ž‘์—… ์ค‘์— ํ•„๋“œ์˜ ๋ธํƒ€๋งŒ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ ๊ณผ๋ฐ˜์ˆ˜ ์ปค๋ฐ‹ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


โžค ์˜ค๋ฅธ์ชฝ ์ƒ๋‹จ์˜ ์–ธ์–ด ์„ ํƒ ๋“œ๋กญ๋‹ค์šด ๋ฉ”๋‰ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด ํŽ˜์ด์ง€์— ์žˆ๋Š” ์˜ˆ์‹œ์˜ ์–ธ์–ด๋ฅผ ์„ค์ •ํ•˜์„ธ์š”.


์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ์ตœ์‹  ๋‹ค์ˆ˜ ์ปค๋ฐ‹ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด "updateLookup" ๊ฐ’๊ณผ ํ•จ๊ป˜ "fullDocument" ์˜ต์…˜์„ mongoc_collection_watch ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” fullDocument ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"์„ db.collection.watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” FullDocument ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ณผ๋ฐ˜์ˆ˜ ์ปค๋ฐ‹๋œ ์ตœ์‹  ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด SetFullDocument(options.UpdateLookup) ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์˜ต์…˜์„ ์‚ฌ์šฉํ•˜์„ธ์š”.

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด FullDocument.UPDATE_LOOKUP์„(๋ฅผ) {3} ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” FullDocument ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ ๋‹ค์ˆ˜ FullDocument.UPDATE_LOOKUP ์ปค๋ฐ‹ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด ์„ ChangeStreamFlow.fullDocument() ๋ฉ”์„œ๋“œ.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” FullDocument ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด full_document='updateLookup'์„ db.collection.watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” `full_document ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด { fullDocument: 'updateLookup' }์„ db.collection.watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” fullDocument ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ๋Š” ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ์„ธ์Šคํ•˜๋Š” ์˜ˆ์‹œ์ž…๋‹ˆ๋‹ค.

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});

๋˜๋Š” ๋ฐ˜๋ณต๊ธฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ์„ธ์Šคํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"์„ db.watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” fullDocument ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด full_document='updateLookup'์„ db.collection.watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” full_document ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด full_document: 'updateLookup'์„ db.watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ ๋ชจ๋“  ์—…๋ฐ์ดํŠธ ์ž‘์—… ์•Œ๋ฆผ์—๋Š” ์—…๋ฐ์ดํŠธ ์ž‘์—…์˜ ์˜ํ–ฅ์„ ๋ฐ›๋Š” ๋ฌธ์„œ์˜ ํ˜„์žฌ ๋ฒ„์ „์„ ๋‚˜ํƒ€๋‚ด๋Š” full_document ํ•„๋“œ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด options: ChangeStreamOptions(fullDocument: .updateLookup)์„(๋ฅผ) {3} ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ปค๋ฐ‹๋œ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด options: ChangeStreamOptions(fullDocument: .updateLookup)์„(๋ฅผ) {3} ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

์ฐธ๊ณ 

์—…๋ฐ์ดํŠธ ์ž‘์—… ํ›„, ์กฐํšŒ ์ „์— ์—…๋ฐ์ดํŠธ๋œ ๋ฌธ์„œ๋ฅผ ์ˆ˜์ •ํ•œ ๋Œ€๋‹ค์ˆ˜ ์ปค๋ฐ‹ ์ž‘์—…์ด ํ•˜๋‚˜ ์ด์ƒ ์žˆ๋Š” ๊ฒฝ์šฐ ๋ฐ˜ํ™˜๋˜๋Š” ์ „์ฒด ๋ฌธ์„œ๊ฐ€ ์—…๋ฐ์ดํŠธ ์ž‘์—… ์‹œ์ ์˜ ๋ฌธ์„œ์™€ ํฌ๊ฒŒ ๋‹ค๋ฅผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ทธ๋Ÿฌ๋‚˜ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ๋ฌธ์„œ์— ํฌํ•จ๋œ ๋ธํƒ€๋Š” ํ•ญ์ƒ ํ•ด๋‹น ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ์— ์ ์šฉ๋œ ๊ฐ์‹œ ๋Œ€์ƒ ์ปฌ๋ ‰์…˜ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ์ •ํ™•ํ•˜๊ฒŒ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์ค‘ ํ•˜๋‚˜์— ํ•ด๋‹นํ•˜๋Š” ๊ฒฝ์šฐ ์—…๋ฐ์ดํŠธ ์ด๋ฒคํŠธ์˜ fullDocument ํ•„๋“œ๊ฐ€ ๋ˆ„๋ฝ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋ฌธ์„œ๊ฐ€ ์‚ญ์ œ๋˜๊ฑฐ๋‚˜ ์—…๋ฐ์ดํŠธ์™€ ์กฐํšŒ ์‚ฌ์ด์— ์ปฌ๋ ‰์…˜์ด ์‚ญ์ œ๋œ ๊ฒฝ์šฐ.

  • ์—…๋ฐ์ดํŠธ๋กœ ์ธํ•ด ํ•ด๋‹น ์ปฌ๋ ‰์…˜์˜ ์ƒค๋“œ ํ‚ค์— ์žˆ๋Š” ํ•„๋“œ ์ค‘ ํ•˜๋‚˜ ์ด์ƒ์˜ ๊ฐ’์ด ๋ณ€๊ฒฝ๋˜๋Š” ๊ฒฝ์šฐ.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์‘๋‹ต ๋ฌธ์„œ ํ˜•์‹์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Change Events๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ์ปค์„œ๋ฅผ ์—ด ๋•Œ ์žฌ๊ฐœ ํ† ํฐ์„ resumeAfter ๋˜๋Š” startAfter๋กœ ์ง€์ •ํ•˜์—ฌ ์žฌ๊ฐœํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ปค์„œ๋ฅผ ์—ฌ๋Š” ์‹œ์ ์— ์žฌ๊ฐœ(resume) ํ† ํฐ์„ resumeAfter์— ์ „๋‹ฌํ•˜์—ฌ ํŠน์ • ์ด๋ฒคํŠธ ์ดํ›„์— ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์žฌ๊ฐœํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์žฌ๊ฐœ ํ† ํฐ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์žฌ๊ฐœ ํ† ํฐ์„ ์ฐธ์กฐํ•˜์„ธ์š”.

์ค‘์š”

  • ํƒ€์ž„์Šคํƒฌํ”„๊ฐ€ ๊ณผ๊ฑฐ์— ์žˆ์—ˆ๋˜ ๊ฒฝ์šฐ, oplog์— ํ† ํฐ ๋˜๋Š” ํƒ€์ž„์Šคํƒฌํ”„์™€ ๊ด€๋ จ๋œ ์ž‘์—…์„ ์ฐพ์„ ์ˆ˜ ์žˆ๋Š” ์ถฉ๋ถ„ํ•œ ๊ธฐ๋ก์ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • ๋ฌดํšจํ™” ์ด๋ฒคํŠธ(์˜ˆ์‹œ: ์ปฌ๋ ‰์…˜ ์ œ๊ฑฐ ๋˜๋Š” ์ด๋ฆ„ ๋ฐ”๊พธ๊ธฐ)๋กœ ์ธํ•ด ์ŠคํŠธ๋ฆผ์ด ๋‹ซํžŒ ํ›„์—๋Š” resumeAfter๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ๋Œ€์‹  ๋ฌดํšจํ™” ์ด๋ฒคํŠธ ํ›„ startAfter๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ƒˆ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์•„๋ž˜ ์˜ˆ์—์„œ๋Š” ์ŠคํŠธ๋ฆผ์ด ์‚ญ์ œ๋œ ํ›„ ์ŠคํŠธ๋ฆผ์„ ๋‹ค์‹œ ์ƒ์„ฑํ•˜๊ธฐ ์œ„ํ•ด ์ŠคํŠธ๋ฆผ ์˜ต์…˜์— resumeAfter ์˜ต์…˜์ด ์ถ”๊ฐ€๋ฉ๋‹ˆ๋‹ค. ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— _id์„ ์ „๋‹ฌํ•˜๋ฉด ์ง€์ •๋œ ์ž‘์—… ์ดํ›„๋ถ€ํ„ฐ ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๊ณ  ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค.

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

์•„๋ž˜ ์˜ˆ์—์„œ resumeToken์€ ๋งˆ์ง€๋ง‰ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ๋ฌธ์„œ์—์„œ ๊ฒ€์ƒ‰๋˜์–ด ์˜ต์…˜์œผ๋กœ Watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌ๋ฉ๋‹ˆ๋‹ค. resumeToken์„ Watch() ๋ฉ”์„œ๋“œ์— ์ „๋‹ฌํ•˜๋ฉด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ์ดํ›„๋ถ€ํ„ฐ ์•Œ๋ฆผ์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•˜๋„๋ก ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค.

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

ChangeStreamOptions.SetResumeAfter๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์žฌ๊ฐœ ํ† ํฐ์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resumeAfter ์˜ต์…˜์ด ์„ค์ •๋œ ๊ฒฝ์šฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค. SetResumeAfter๋Š” ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธ๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์ทจํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ์‹œ: ์•„๋ž˜ ์˜ˆ์‹œ์—์„œ resumeToken.

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resumeAfter() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resumeAfter() ๋ฉ”์„œ๋“œ๋Š” ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธ๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’(์˜ˆ: ์•„๋ž˜ ์˜ˆ์‹œ์˜ resumeToken)์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

ChangeStreamFlow.resumeAfter() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„ ์•Œ๋ฆผ ์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค. resumeAfter() ๋ฉ”์„œ๋“œ๋Š” ์•„๋ž˜ ์˜ˆ์‹œ resumeToken ๋ณ€์ˆ˜์™€ ๊ฐ™์ด ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ•ด์„๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resume_after ์ˆ˜์ •์ž๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resume_after ์ˆ˜์ •์ž๋Š” ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธํ•ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์•„๋ž˜ ์˜ˆ์‹œ์˜ resume_token์„(๋ฅผ) ๋งํ•ฉ๋‹ˆ๋‹ค.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resumeAfter ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resumeAfter ์˜ต์…˜์€ ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธ๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์ทจํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ์‹œ: ์•„๋ž˜ ์˜ˆ์—์„œ resumeToken.

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream
.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream
.on('change', next => {
processChange(next);
})
.once('error', error => {
// handle error
});
})
.once('error', error => {
// handle error
});

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resumeAfter ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resumeAfter ์˜ต์…˜์€ ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธ๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์ทจํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ์‹œ: ์•„๋ž˜ ์˜ˆ์—์„œ $resumeToken.

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resume_after ์ˆ˜์ •์ž๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resume_after ์ˆ˜์ •์ž๋Š” ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธํ•ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์•„๋ž˜ ์˜ˆ์‹œ์˜ resume_token์„(๋ฅผ) ๋งํ•ฉ๋‹ˆ๋‹ค.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resume_after ์ˆ˜์ •์ž๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resume_after ์ˆ˜์ •์ž๋Š” ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธํ•ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์•„๋ž˜ ์˜ˆ์‹œ์˜ resume_token์„(๋ฅผ) ๋งํ•ฉ๋‹ˆ๋‹ค.

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resumeAfter ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resumeAfter ์˜ต์…˜์€ ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธ๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์ทจํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ์‹œ: ์•„๋ž˜ ์˜ˆ์—์„œ resumeToken.

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

์žฌ๊ฐœ ํ† ํฐ์— ์ง€์ •๋œ ์ž‘์—… ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•˜๋ ค๋ฉด resumeAfter ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. resumeAfter ์˜ต์…˜์€ ์žฌ๊ฐœ ํ† ํฐ์œผ๋กœ ํ™•์ธ๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ’์„ ์ทจํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ์‹œ: ์•„๋ž˜ ์˜ˆ์—์„œ resumeToken.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

์ปค์„œ๋ฅผ ์—ฌ๋Š” ์‹œ์ ์— ์žฌ๊ฐœ(resume) ํ† ํฐ์„ startAfter์— ์ „๋‹ฌํ•˜์—ฌ ํŠน์ • ์ด๋ฒคํŠธ ์ดํ›„์— ์ƒˆ๋กœ์šด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. restartAfter ์™€ ๋‹ฌ๋ฆฌ startAfter๋Š” ์ƒˆ๋กœ์šด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•˜์—ฌ ๋ฌดํšจํ™” ์ด๋ฒคํŠธ ํ›„์— ์•Œ๋ฆผ์„ ์žฌ๊ฐœํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์žฌ๊ฐœ ํ† ํฐ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์žฌ๊ฐœ ํ† ํฐ์„ ์ฐธ์กฐํ•˜์„ธ์š”.

์ค‘์š”

  • ํƒ€์ž„์Šคํƒฌํ”„๊ฐ€ ๊ณผ๊ฑฐ์— ์žˆ์—ˆ๋˜ ๊ฒฝ์šฐ, oplog์— ํ† ํฐ ๋˜๋Š” ํƒ€์ž„์Šคํƒฌํ”„์™€ ๊ด€๋ จ๋œ ์ž‘์—…์„ ์ฐพ์„ ์ˆ˜ ์žˆ๋Š” ์ถฉ๋ถ„ํ•œ ๊ธฐ๋ก์ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์žฌ๊ฐœ ํ† ํฐ์€ ์—ฌ๋Ÿฌ ์ถœ์ฒ˜์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์†Œ์Šค
์„ค๋ช…

๊ฐ ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ ์•Œ๋ฆผ์€ _id ํ•„๋“œ์— ์žฌ๊ฐœ ํ† ํฐ์„ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค.

$changeStream ์ง‘๊ณ„ ๋‹จ๊ณ„์—๋Š” cursor.postBatchResumeToken ํ•„๋“œ์— ์žฌ๊ฐœ ํ† ํฐ์ด ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

์ด ํ•„๋“œ๋Š” aggregate ๋ช…๋ น์„ ์‚ฌ์šฉํ•  ๋•Œ๋งŒ ๋‚˜ํƒ€๋‚ฉ๋‹ˆ๋‹ค.

getMore ๋ช…๋ น์€ cursor.postBatchResumeToken ํ•„๋“œ์— ์žฌ๊ฐœ ํ† ํฐ์„ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค.

MongoDB 4.2๋ถ€ํ„ฐ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ง‘๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ์ด ์ด๋ฒคํŠธ์˜ _id ํ•„๋“œ๋ฅผ ์ˆ˜์ •ํ•˜๋Š” ๊ฒฝ์šฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์ด ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ต๋‹ˆ๋‹ค.

ํŒ

MongoDB๋Š” mongosh ํ™•์žฅ์ธ "์Šค๋‹ˆํŽซ"์„ ์ œ๊ณตํ•˜์—ฌ 16์ง„์ˆ˜๋กœ ์ธ์ฝ”๋”ฉ๋œ ์žฌ๊ฐœ ํ† ํฐ์„ ๋””์ฝ”๋”ฉํ•ฉ๋‹ˆ๋‹ค.

์—์„œ resumetoken ์Šค๋‹ˆํŽซ์„ ์„ค์น˜ํ•˜๊ณ  ์‹คํ–‰ํ•  ์ˆ˜ mongosh ์žˆ์Šต๋‹ˆ๋‹ค.

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

์‹œ์Šคํ…œ์— npm(์ด)๊ฐ€ ์„ค์น˜๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ ๋ช…๋ น์ค„์—์„œ (mongosh(์„)๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ )resumetoken์„ ์‹คํ–‰ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

npx mongodb-resumetoken-decoder <RESUME TOKEN>

์ž์„ธํ•œ ๋‚ด์šฉ์€ ๋‹ค์Œ์„ ์ฐธ์กฐํ•˜์„ธ์š”.

๋ณ€๊ฒฝ ์ด๋ฒคํŠธ ์•Œ๋ฆผ์€ _id ํ•„๋“œ์— ์žฌ๊ฐœ ํ† ํฐ์„ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค.

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

aggregate ๋ช…๋ น์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ $changeStream ์ง‘๊ณ„ ๋‹จ๊ณ„์—์„œ cursor.postBatchResumeToken ํ•„๋“œ์— ์žฌ๊ฐœ ํ† ํฐ์„ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore ๋ช…๋ น์€ cursor.postBatchResumeToken ํ•„๋“œ์— ์žฌ๊ฐœ ํ† ํฐ์„ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค.

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ์ด ์ง€์†๋˜๋ฉด ๋‹ค์šด์ŠคํŠธ๋ฆผ ์‹œ์Šคํ…œ์— ์•Œ๋ ค์ฃผ๋ฏ€๋กœ ๋น„์ฆˆ๋‹ˆ์Šค ์‹œ์Šคํ…œ์— ์˜์กดํ•˜๋Š” ์•„ํ‚คํ…์ฒ˜์— ๋„์›€์ด ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๊ฐœ๋ฐœ์ž๊ฐ€ ์ถ”์ถœ, ๋ณ€ํ™˜ ๋ฐ ๋กœ๋“œ(ETL) ์„œ๋น„์Šค, ํ”Œ๋žซํผ ๊ฐ„ ๋™๊ธฐํ™”, ํ˜‘์—… ๊ธฐ๋Šฅ ๋ฐ ์•Œ๋ฆผ ์„œ๋น„์Šค๋ฅผ ๊ตฌํ˜„ํ•  ๋•Œ ์‹œ๊ฐ„์„ ์ ˆ์•ฝํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ž์ฒด ๊ด€๋ฆฌ ๋ฐฐํฌ์„œ๋ฒ„์—์„œ ์ธ์ฆ ๋ฐ ๊ถŒํ•œ ๋ถ€์—ฌ๋ฅผ ์ ์šฉํ•œ ๋ฐฐํฌ์˜ ๊ฒฝ์šฐ ๋‹ค์Œ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

  • ํŠน์ • ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ด๋ ค๋ฉด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ํ•ด๋‹น ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•ด changeStream ๋ฐ find ์กฐ์น˜๋ฅผ ๋ถ€์—ฌํ•  ์ˆ˜ ์žˆ๋Š” ๊ถŒํ•œ์ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • ๋‹จ์ผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ด๋ ค๋ฉด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ๋ชจ๋“  ๋น„system ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•ด changeStream๋ฐ find์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ๊ถŒํ•œ์ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • ์ „์ฒด ๋ฐฐํฌ์—์„œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์„ ์—ด๋ ค๋ฉด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋ฐฐํฌ์˜ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋Œ€ํ•ดsystem ์ด ์•„๋‹Œ ๋ชจ๋“  collection์— ๋Œ€ํ•ด changeStream ๋ฐ find ์กฐ์น˜๋ฅผ ํ—ˆ์šฉํ•˜๋Š” ๊ถŒํ•œ์ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๋ณต์ œ๋ณธ ์ง‘ํ•ฉ์— ์žˆ๋Š” ๋Œ€๋ถ€๋ถ„์˜ ๋ฐ์ดํ„ฐ ๋ณด์œ  ๋ฉค๋ฒ„์— ์ง€์†๋œ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ๋‚ด์šฉ์— ๋Œ€ํ•ด์„œ๋งŒ ์•Œ๋ฆฝ๋‹ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•˜๋ฉด ์˜ค๋ฅ˜ ์‹œ๋‚˜๋ฆฌ์˜ค์—์„œ ์ง€์†์ ์ธ ๋Œ€๋‹ค์ˆ˜ ์ปค๋ฐ‹ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์— ๋Œ€ํ•ด์„œ๋งŒ ์•Œ๋ฆผ์ด ํŠธ๋ฆฌ๊ฑฐ๋ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด ํ”„๋ผ์ด๋จธ๋ฆฌ์— ๋Œ€ํ•ด ์—ด๋ฆฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ปค์„œ๊ฐ€ ์žˆ๋Š” 3-๋…ธ๋“œ ๋ณต์ œ๋ณธ ์„ธํŠธ๊ฐ€ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด ๋ด…์‹œ๋‹ค. ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์‚ฝ์ž… ์—ฐ์‚ฐ์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒฝ์šฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ํ•ด๋‹น ์‚ฝ์ž…์ด ๋Œ€๋ถ€๋ถ„์˜ ๋ฐ์ดํ„ฐ ๋ณด์œ  ๋…ธ๋“œ์— ์ง€์†๋œ ๊ฒฝ์šฐ์—๋งŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ์„ ์•Œ๋ฆฝ๋‹ˆ๋‹ค.

์ž‘์—…์ด ํŠธ๋žœ์žญ์…˜๊ณผ ์—ฐ๊ฒฐ๋œ ๊ฒฝ์šฐ, ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ ๋ฌธ์„œ์— txnNumber ๋ฐ lsid๊ฐ€ ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์€ ๋ช…์‹œ์ ์ธ ๋ฐ์ดํ„ฐ ์ •๋ ฌ์ด ์ œ๊ณต๋˜์ง€ ์•Š๋Š” ํ•œ simple ์ด์ง„ ๋น„๊ต๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

MongoDB 5.3๋ถ€ํ„ฐ๋Š” ๋ฒ”์œ„ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์ค‘์— ๊ณ ์•„ ๋ฌธ์„œ์— ๋Œ€ํ•œ ์—…๋ฐ์ดํŠธ์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ๊ฐ€ ์ƒ์„ฑ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

MongoDB 6.0๋ถ€ํ„ฐ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฌธ์„œ์˜ ๋ณ€๊ฒฝ ์ „ํ›„ ๋ฒ„์ „(๋ฌธ์„œ์˜ ์ „ํ›„ ์ด๋ฏธ์ง€)์„ ์ถœ๋ ฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์‚ฌ์ „ ์ด๋ฏธ์ง€๋Š” ๋ฌธ์„œ๊ฐ€ ๊ต์ฒด, ์—…๋ฐ์ดํŠธ ๋˜๋Š” ์‚ญ์ œ๋˜๊ธฐ ์ „์˜ ๋ฌธ์„œ์ž…๋‹ˆ๋‹ค. ์‚ฝ์ž…๋œ ๋ฌธ์„œ์—๋Š” ์‚ฌ์ „ ์ด๋ฏธ์ง€๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

  • ์‚ฌํ›„ ์ด๋ฏธ์ง€๋Š” ๋ฌธ์„œ๊ฐ€ ์‚ฝ์ž…, ๊ต์ฒด, ์—…๋ฐ์ดํŠธ๋œ ํ›„์˜ ๋ฌธ์„œ์ž…๋‹ˆ๋‹ค. ์‚ญ์ œ๋œ ๋ฌธ์„œ์— ๋Œ€ํ•œ ์‚ฌํ›„ ์ด๋ฏธ์ง€๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

  • db.createCollection(), create ๋˜๋Š” collMod์„(๋ฅผ) ์‚ฌ์šฉํ•˜๋Š” ์ปฌ๋ ‰์…˜์— ๋Œ€ํ•ด changeStreamPreAndPostImages์„(๋ฅผ) ํ™œ์„ฑํ™”ํ•˜์„ธ์š”.

์ด๋ฏธ์ง€๊ฐ€ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฝ์šฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ์— ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

  • ๋ฌธ์„œ ์—…๋ฐ์ดํŠธ ๋˜๋Š” ์‚ญ์ œ ์ž‘์—… ์‹œ collection์—์„œ ํ™œ์„ฑํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.

  • expireAfterSeconds์—์„œ ์ „ํ›„ ์ด๋ฏธ์ง€ ๋ณด์กด ์‹œ๊ฐ„ ์„ค์ • ์ดํ›„์— ์ œ๊ฑฐ๋ฉ๋‹ˆ๋‹ค.

    • ๋‹ค์Œ ์˜ˆ์‹œ์—์„œ๋Š” ์ „์ฒด ํด๋Ÿฌ์Šคํ„ฐ์—์„œ expireAfterSeconds๋ฅผ 100์ดˆ๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • ๋‹ค์Œ ์˜ˆ์‹œ์—์„œ๋Š” expireAfterSeconds ๋“ฑ ํ˜„์žฌ changeStreamOptions ์„ค์ •์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSeconds๋ฅผ off๋กœ ์„ค์ •ํ•˜๋ฉด ๊ธฐ๋ณธ ๋ณด์กด ์ •์ฑ…์ด ์‚ฌ์šฉ๋˜๋ฉฐ, ํ•ด๋‹น ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ๊ฐ€ oplog์—์„œ ์ œ๊ฑฐ๋  ๋•Œ๊นŒ์ง€ ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๊ฐ€ ๋ณด์กด๋ฉ๋‹ˆ๋‹ค.

    • ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ๊ฐ€ oplog์—์„œ ์ œ๊ฑฐ๋˜๋ฉด expireAfterSeconds ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€ ๋ณด์กด ์‹œ๊ฐ„์— ๊ด€๊ณ„์—†์ด ํ•ด๋‹น ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋„ ์‚ญ์ œ๋ฉ๋‹ˆ๋‹ค.

์ถ”๊ฐ€ ๊ณ ๋ ค ์‚ฌํ•ญ

  • ์ „ํ›„ ์ด๋ฏธ์ง€๋ฅผ ํ™œ์„ฑํ™”ํ•˜๋ฉด ์ €์žฅ ๊ณต๊ฐ„์ด ์†Œ๋ชจ๋˜๊ณ  ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ๋Š˜์–ด๋‚ฉ๋‹ˆ๋‹ค. ํ•„์š”ํ•œ ๊ฒฝ์šฐ์—๋งŒ ์ „ํ›„ ์ด๋ฏธ์ง€๋ฅผ ํ™œ์„ฑํ™”ํ•˜์„ธ์š”.

  • ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ ํฌ๊ธฐ๋ฅผ 16๋ฉ”๋น„๋ฐ”์ดํŠธ ๋ฏธ๋งŒ์œผ๋กœ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฒคํŠธ ํฌ๊ธฐ๋ฅผ ์ œํ•œํ•˜๋ ค๋ฉด ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

    • ๋ฌธ์„œ ํฌ๊ธฐ๋ฅผ 8๋ฉ”๊ฐ€๋ฐ”์ดํŠธ๋กœ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค. updateDescription๊ณผ ๊ฐ™์€ ๋‹ค๋ฅธ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ ํ•„๋“œ๊ฐ€ ํฌ์ง€ ์•Š์€ ๊ฒฝ์šฐ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์—์„œ ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ๋™์‹œ์— ์š”์ฒญํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • updateDescription๊ณผ ๊ฐ™์€ ๋‹ค๋ฅธ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ด๋ฒคํŠธ ํ•„๋“œ๊ฐ€ ํฌ์ง€ ์•Š์€ ๊ฒฝ์šฐ, ์ตœ๋Œ€ 16๋ฉ”๋น„๋ฐ”์ดํŠธ ๋ฌธ์„œ์— ๋Œ€ํ•ด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์—์„œ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋งŒ ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.

    • ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฝ์šฐ ์ตœ๋Œ€ 16๋ฉ”๋น„๋ฐ”์ดํŠธ์˜ ๋ฌธ์„œ์— ๋Œ€ํ•ด ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ ์ถœ๋ ฅ์—์„œ ์‚ฌ์ „ ์ด๋ฏธ์ง€๋งŒ ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.

      • ๋ฌธ์„œ ์—…๋ฐ์ดํŠธ๊ฐ€ ๋ฌธ์„œ ๊ตฌ์กฐ๋‚˜ ๋‚ด์šฉ์˜ ์ž‘์€ ๋ถ€๋ถ„์—๋งŒ ์˜ํ–ฅ์„ ๋ฏธ์นฉ๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ 

      • replace ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค์ง€ ์•Š์Šต๋‹ˆ๋‹ค. replace ์ด๋ฒคํŠธ์—๋Š” ํ•ญ์ƒ ํ›„์ด๋ฏธ์ง€๊ฐ€ ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

  • ์‚ฌ์ „ ์ด๋ฏธ์ง€๋ฅผ ์š”์ฒญํ•˜๋ ค๋ฉด db.collection.watch()์—์„œ fullDocumentBeforeChange๋ฅผ required ๋˜๋Š” whenAvailable๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค. ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ์š”์ฒญํ•˜๋ ค๋ฉด ๋™์ผํ•œ ๋ฐฉ๋ฒ•์œผ๋กœ fullDocument๋ฅผ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

  • ์‚ฌ์ „ ์ด๋ฏธ์ง€๊ฐ€ config.system.preimages ์ปฌ๋ ‰์…˜์— ๊ธฐ๋ก๋ฉ๋‹ˆ๋‹ค.

    • config.system.preimages collection์€ ์ปค์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. collection ํฌ๊ธฐ๋ฅผ ์ œํ•œํ•˜๋ ค๋ฉด ์•ž์„œ ํ‘œ์‹œ๋œ ๋Œ€๋กœ ์‚ฌ์ „ ์ด๋ฏธ์ง€์— ๋Œ€ํ•ด expireAfterSeconds ์‹œ๊ฐ„์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • ์‚ฌ์ „ ์ด๋ฏธ์ง€๋Š” ๋ฐฑ๊ทธ๋ผ์šด๋“œ ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ œ๊ฑฐํ•ฉ๋‹ˆ๋‹ค.

์ค‘์š”

์ด์ „ ๋ฒ„์ „๊ณผ ํ˜ธํ™˜๋˜์ง€ ์•Š๋Š” ๊ธฐ๋Šฅ

MongoDB 6.0๋ถ€ํ„ฐ๋Š” ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผ์— ๋ฌธ์„œ ์‚ฌ์ „ ๋ฐ ์‚ฌํ›„ ์ด๋ฏธ์ง€๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ์ด์ „ MongoDB ๋ฒ„์ „์œผ๋กœ ๋‹ค์šด๊ทธ๋ ˆ์ด๋“œํ•˜๊ธฐ ์ „์— collMod ๋ช…๋ น์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ collection์— ๋Œ€ํ•ด changeStreamPreAndPostImages๋ฅผ ๋น„ํ™œ์„ฑํ™”ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ๋„ ์ฐธ์กฐํ•˜์„ธ์š”.

๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผs ์ถœ๋ ฅ์— ๋Œ€ํ•œ ์ „์ฒด ์˜ˆ์‹œ๋Š” ์ „ํ›„ ์ด๋ฏธ์ง€๋ฅผ ํฌํ•จํ•˜๋Š” ๋ฌธ์„œ์˜ ๋ณ€๊ฒฝ ์ŠคํŠธ๋ฆผs๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.