๋ณ๊ฒฝ ์คํธ๋ฆผ
์ด ํ์ด์ง์ ๋ด์ฉ
- ๊ฐ์ฉ์ฑ
- ์ฐ๊ฒฐ
- ์ปฌ๋ ์ , ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋๋ ๋ฐฐํฌ ๋ณด๊ธฐ
- ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ฑ๋ฅ ๊ณ ๋ ค ์ฌํญ
- ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๊ธฐ
- ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ ์์
- ์ ๋ฐ์ดํธ ์์ ์ ๋ํ ์ ์ฒด ๋ฌธ์ ์กฐํ
- ๋ณ๊ฒฝ ์คํธ๋ฆผ ๋ค์ ์์
- ์ฌ์ฉ ์ฌ๋ก
- ์ก์ธ์ค ์ ์ด
- ์ด๋ฒคํธ ์๋ฆผ
- ๋ฐ์ดํฐ ์ ๋ ฌ
- Change Streams ๋ฐ ๊ณ ์ ๋ฌธ์
- ์ ํ ์ด๋ฏธ์ง๋ฅผ ํฌํจํ๋ ๋ฌธ์์ Change Streams
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ํตํด ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฌ์ ์ ๋ณต์กํ ๋ฐฉ์ ๋ฐ ์๋์ผ๋ก oplog๋ฅผ ํ ์ผ๋งํ๋ ์ํ ์์ด ์ค์๊ฐ ๋ฐ์ดํฐ ๋ณ๊ฒฝ์ ์ก์ธ์คํ ์ ์์ต๋๋ค. ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ฌ์ฉํ์ฌ ๋จ์ผ ์ปฌ๋ ์ , ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋๋ ์ ์ฒด ๋ฐฐํฌ์ ๋ชจ๋ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์ฌํญ์ ๊ตฌ๋ ํ๊ณ ์ด์ ์ฆ์ ๋์ํ ์ ์์ต๋๋ค. ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ง๊ณ ํ๋ ์์ํฌ๋ฅผ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ์ ํ๋ฆฌ์ผ์ด์ ์์ ํน์ ๋ณ๊ฒฝ ์ฌํญ์ ํํฐ๋งํ๊ฑฐ๋ ์๋ฆผ์ ๋ง์๋๋ก ๋ณํํ ์๋ ์์ต๋๋ค.
MongoDB 5.1๋ถํฐ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์ต์ ํ๋์ด ๋ ํจ์จ์ ์ธ ๋ฆฌ์์ค ์ฌ์ฉ๋ฅ ๊ณผ ์ผ๋ถ ์ง๊ณ ํ์ดํ๋ผ์ธ ๋จ๊ณ์ ๋ ๋น ๋ฅธ ์คํ์ ์ ๊ณตํฉ๋๋ค.
๊ฐ์ฉ์ฑ
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ณต์ ๋ณธ ์ธํธ ๋ฐ ์ค๋ฉ๋ ํด๋ฌ์คํฐ์ ์ฌ์ฉํ ์ ์์ต๋๋ค:
์คํ ๋ฆฌ์ง ์์ง
๋ณต์ ๋ณธ ์ธํธ์ ์ค๋ฉ๋ ํด๋ฌ์คํฐ๋ WiredTiger ์คํ ๋ฆฌ์ง ์์ง์ ์ฌ์ฉํด์ผ ํฉ๋๋ค. Change Stream์ MongoDB์ ๋ฏธ์ฌ์ฉ ๋ฐ์ดํฐ ์ํธํ ๊ธฐ๋ฅ์ ์ฌ์ฉํ๋ ๋ฐฐํฌ์์๋ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ณต์ ๋ณธ ์ธํธ ํ๋กํ ์ฝ ๋ฒ์ .
๋ณต์ ๋ณธ ์ธํธ์ ์ค๋ฉ๋ ํด๋ฌ์คํฐ๋ ๋ณต์ ๋ณธ ์ธํธ ํ๋กํ ์ฝ ๋ฒ์ 1(
pv1
)๋ฅผ ์ฌ์ฉํด์ผ ํฉ๋๋ค.์ฝ๊ธฐ ๊ณ ๋ ค '๋๋ค์' ํ์ฑํ
๋ณ๊ฒฝ ์คํธ๋ฆผ ์
"majority"
์ฝ๊ธฐ ๊ณ ๋ ค ์ง์์ ๊ด๊ณ์์ด ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ฆ, ์ฝ๊ธฐ ๊ณ ๋ คmajority
์ง์์ ํ์ฑํ(๊ธฐ๋ณธ๊ฐ)ํ๊ฑฐ๋ ๋นํ์ฑํ ํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
Stable API ์ง์
๋ณ๊ฒฝ ์คํธ๋ฆผ์ Stable API V1 ์ ํฌํจ๋์ด ์์ต๋๋ค. ๊ทธ๋ฌ๋ showExpandedEvents ์ต์ ์ Stable API V1 ์ ํฌํจ๋์ด ์์ง ์์ต๋๋ค.
์ฐ๊ฒฐ
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ํ ์ฐ๊ฒฐ์ +srv
์ฐ๊ฒฐ ์ต์
๊ณผ ํจ๊ป DNS ์๋ ๋ชฉ๋ก์ ์ฌ์ฉํ๊ฑฐ๋ ์ฐ๊ฒฐ ๋ฌธ์์ด์ ์๋ฒ๋ฅผ ๊ฐ๋ณ์ ์ผ๋ก ๋์ดํ์ฌ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋๋ผ์ด๋ฒ๊ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ํ ์ฐ๊ฒฐ์ ์๊ฑฐ๋ ์ฐ๊ฒฐ์ด ๋ค์ด๋๋ฉด ํด๋ฌ์คํฐ ๋ด ์ฝ๊ธฐ ์ค์ ์ด ์ผ์นํ๋ ๋ค๋ฅธ ๋ ธ๋๋ฅผ ํตํด ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ํ ์ฐ๊ฒฐ์ ๋ค์ ์ค์ ํ๋ ค๊ณ ์๋ํฉ๋๋ค. ๋๋ผ์ด๋ฒ๊ฐ ์ฌ๋ฐ๋ฅธ ์ฝ๊ธฐ ์ค์ ์ ๊ฐ์ง ๋ ธ๋๋ฅผ ์ฐพ์ ์ ์์ผ๋ฉด ์์ธ๊ฐ ๋ฐ์ํฉ๋๋ค.
์์ธํ ๋ด์ฉ์ ์ฐ๊ฒฐ ๋ฌธ์์ด URI ํ์์ ์ฐธ์กฐํ์ธ์.
์ปฌ๋ ์ , ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋๋ ๋ฐฐํฌ ๋ณด๊ธฐ
๋ค์์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ด ์ ์์ต๋๋ค.
๋์ | ์ค๋ช
|
---|---|
์ปฌ๋ ์ | ๋จ์ผ ์ปฌ๋ ์
( ์ด ํ์ด์ง์ ์์์์๋ MongoDB ๋ณ๊ฒฝ ์คํธ๋ฆผ ๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํ์ฌ ๋จ์ผ ์ปฌ๋ ์
์ ๋ํด ์ปค์๋ฅผ ์ด๊ณ ์์
ํฉ๋๋ค. |
๋ฐ์ดํฐ๋ฒ ์ด์ค | ๋จ์ผ ๋ฐ์ดํฐ๋ฒ ์ด์ค( MongoDB ๋๋ผ์ด๋ฒ ๋ฉ์๋์ ๋ํด์๋ ๋๋ผ์ด๋ฒ ์ค๋ช
์๋ฅผ ์ฐธ์กฐํ์ธ์. |
๋ฐฐํฌ | ๋ฐฐํฌ(๋ณต์ ๋ณธ ์ธํธ ๋๋ ์ค๋ฉ๋ ํด๋ฌ์คํฐ)์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๋ฅผ ์ด์ด MongoDB ๋๋ผ์ด๋ฒ ๋ฉ์๋์ ๋ํด์๋ ๋๋ผ์ด๋ฒ ์ค๋ช
์๋ฅผ ์ฐธ์กฐํ์ธ์. |
์ฐธ๊ณ
๋ณ๊ฒฝ ์คํธ๋ฆผ ์์
์ด ํ์ด์ง์ ์์์์๋ ์ปฌ๋ ์ ์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๋ฅผ ์ด๊ณ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๋ก ์์ ํ๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํ๊ธฐ ์ํด MongoDB ๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํฉ๋๋ค.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ฑ๋ฅ ๊ณ ๋ ค ์ฌํญ
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ํด ์ด๋ ค ์๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ด ์ฐ๊ฒฐ ํ ํฌ๊ธฐ๋ฅผ ์ด๊ณผํ ๊ฒฝ์ฐ ์๋ฆผ ์ง์ฐ์ด ๋ฐ์ํ ์ ์์ต๋๋ค. ๊ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ค์ ์ด๋ฒคํธ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์๊ฐ ๋์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ํ ์ฐ๊ฒฐ๊ณผ GetMore ์์ ์ ์ฌ์ฉํฉ๋๋ค. ์ง์ฐ ์๊ฐ ๋ฌธ์ ๋ฅผ ๋ฐฉ์งํ๋ ค๋ฉด ํ ํฌ๊ธฐ๊ฐ ์ด๋ ค ์๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์๋ณด๋ค ํฐ์ง ํ์ธํด์ผ ํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ MaxPoolSize ์ค์ ์ ์ฐธ์กฐํ์ธ์.
์ค๋ฉ๋ ํด๋ฌ์คํฐ ๊ณ ๋ ค ์ฌํญ
์ค๋ฉ๋ ํด๋ฌ์คํฐ์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์ด๋ฆฌ๋ ๊ฒฝ์ฐ:
mongos
๋ ๊ฐ ์ค๋์์ ๊ฐ๋ณ์ ์ธ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ฑํฉ๋๋ค. ์ด ๋์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ํน์ ์ค๋ ํค ๋ฒ์๋ฅผ ๋์์ผ๋ก ํ๋์ง ์ฌ๋ถ์ ๊ด๊ณ์์ด ๋ฐ์ํฉ๋๋ค.mongos
์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ ๊ฒฐ๊ณผ๋ฅผ ์์ ํ๋ฉด ํด๋น ๊ฒฐ๊ณผ๊ฐ ์ ๋ ฌ ๋ฐ ํํฐ๋ง๋ฉ๋๋ค. ํ์ํ ๊ฒฝ์ฐmongos
์์fullDocument
์กฐํ๋ ์ํํฉ๋๋ค.
์ต์์ ์ฑ๋ฅ์ ์ป์ผ๋ ค๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ์์ $lookup
์ฟผ๋ฆฌ ์ฌ์ฉ์ ์ ํํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๊ธฐ
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ด๋ ค๋ฉด ๋ค์์ ์ํํฉ๋๋ค.
๋ณต์ ๋ณธ ์ธํธ์ ๊ฒฝ์ฐ ๋ฐ์ดํฐ ๋ณด์ ๋ฉค๋ฒ ์ค ํ๋์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๊ธฐ ์์ ์ ์คํํ ์ ์์ต๋๋ค.
์ค๋ฉ๋ ํด๋ฌ์คํฐ์ ๊ฒฝ์ฐ
mongos
์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๊ธฐ ์์ ์ ์คํํด์ผ ํฉ๋๋ค.
๋ค์ ์์๋ ์ปฌ๋ ์ ์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ด๊ณ ์ปค์๋ฅผ ๋ฐ๋ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ๋ฌธ์๋ฅผ ์กฐํํฉ๋๋ค. [1]
โค ์ค๋ฅธ์ชฝ ์๋จ์ ์ธ์ด ์ ํ ๋๋กญ๋ค์ด ๋ฉ๋ด๋ฅผ ์ฌ์ฉํ์ฌ ์ด ํ์ด์ง์ ์๋ ์์์ ์ธ์ด๋ฅผ ์ค์ ํ์ธ์.
์๋์ C ์์ ์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ ์ ์ฐ๊ฒฐํ๊ณ ์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ก์ธ์คํ๋ค๊ณ inventory
๊ฐ์ ํฉ๋๋ค.
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
์๋์ C# ์์ ์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ ์ ์ฐ๊ฒฐํ๊ณ ์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ์ก์ธ์คํ๋ค๊ณ inventory
๊ฐ์ ํฉ๋๋ค.
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
์๋ Go ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
์๋ Java ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์ค ํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
์๋์ ์ฝํ๋ฆฐ (Kotlin) ์์ ์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ ์ ์ฐ๊ฒฐ๋์ด ์๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ์ก์ธ์ค ํ ์ ์๋ค๊ณ ๊ฐ์ ํฉ๋๋ค. ์ด๋ฌํ ์์
์ ์๋ฃํ๋ ๋ฐฉ๋ฒ์ ํ์ต ๋ณด๋ ค๋ฉด ์ฝํ๋ฆฐ (Kotlin) ๋ฃจํด ๋๋ผ์ด๋ฒ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์ปฌ๋ ์
๊ฐ์ด๋ ๋ฅผ ์ฐธ์กฐํ์ธ์.
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
์๋ ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ ํฌํจํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์ค ํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
cursor = db.inventory.watch() document = await cursor.next()
์๋ Node.js ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
๋ค์ ์๋ ์คํธ๋ฆผ์ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ํ๋ก์ธ์คํ๋ ์์์ ๋๋ค.
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream .on('change', next => { // process next document }) .once('error', () => { // handle error });
๋๋ ๋ฐ๋ณต๊ธฐ๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ํ๋ก์ธ์คํ ์๋ ์์ต๋๋ค.
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
ChangeStream์ EventEmitter๋ฅผ ํ์ฅํฉ๋๋ค.
์๋ ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
์๋ Python ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
cursor = db.inventory.watch() next(cursor)
์๋ ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
cursor = inventory.watch.to_enum next_change = cursor.next
์๋ Swift (๋น๋๊ธฐ) ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
์๋ Swift(๋๊ธฐ) ์์์์๋ MongoDB ๋ณต์ ๋ณธ ์ธํธ์ ์ฐ๊ฒฐํ๊ณ inventory
์ปฌ๋ ์
์ด ํฌํจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ก์ธ์คํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
์ปค์์์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ์กฐํํ๋ ค๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๋ฅผ ๋ฐ๋ณตํฉ๋๋ค. ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ์ ๋ํ ์์ธํ ๋ด์ฉ์ Change Events๋ฅผ ์ฐธ์กฐํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๋ ๋ค์ ์ค ํ๋๊ฐ ๋ฐ์ํ ๋๊น์ง ์ด๋ ค ์์ต๋๋ค.
์ปค์๊ฐ ๋ช ์์ ์ผ๋ก ๋ซํ๋๋ค.
๋ฌดํจํ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํฉ๋๋ค. ์๋ฅผ ๋ค์ด ์ปฌ๋ ์ ์ ๊ฑฐ ๋๋ ์ด๋ฆ ๋ฐ๊พธ๊ธฐ๊ฐ ์์ต๋๋ค.
MongoDB ๋ฐฐํฌ์ ๋ํ ์ฐ๊ฒฐ์ด ๋ซํ๊ฑฐ๋ ์๊ฐ ์ด๊ณผ๋ฉ๋๋ค. ์์ธํ ๋ด์ฉ์ Cursor Behaviors๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
๋ฐฐํฌ๊ฐ ์ค๋ฉ๋ ํด๋ฌ์คํฐ์ธ ๊ฒฝ์ฐ ์ค๋๋ฅผ ์ ๊ฑฐํ๋ฉด ์ด๋ ค ์๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๊ฐ ๋ซํ ์ ์์ต๋๋ค. ๋ซํ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๋ ์์ ํ ์ฌ๊ฐ๋์ง ์์ ์ ์์ต๋๋ค.
์ฐธ๊ณ
๋ซํ์ง ์์ ์ปค์์ ๋ผ์ดํ์ฌ์ดํด์ ์ธ์ด์ ๋ฐ๋ผ ๋ค๋ฆ ๋๋ค.
[1] | startAtOperationTime ์ ์ง์ ํ์ฌ ํน์ ์์ ์ ์ปค์๋ฅผ ์ด ์ ์์ต๋๋ค. ์ง์ ๋ ์์์ ์ด ๊ณผ๊ฑฐ์ธ ๊ฒฝ์ฐ oplog์ ์๊ฐ ๋ฒ์ ๋ด์ ์์ด์ผ ํฉ๋๋ค. |
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ ์์
โค ์ค๋ฅธ์ชฝ ์๋จ์ ์ธ์ด ์ ํ ๋๋กญ๋ค์ด ๋ฉ๋ด๋ฅผ ์ฌ์ฉํ์ฌ ์ด ํ์ด์ง์ ์๋ ์์์ ์ธ์ด๋ฅผ ์ค์ ํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
๋ชฉ๋ก์๋ ๋ค์ ๊ธฐ์ค ์ค ํ๋ ๋๋ ๋ ๋ค๋ฅผ ์ถฉ์กฑํ๋ ์์
์ ํํฐ๋งํ๋ ๋จ์ผ $match
๋จ๊ณ๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
username
๊ฐ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.alice
operationType
๊ฐ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.delete
pipeline
์ watch()
๋ฉ์๋์ ์ ๋ฌํ๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์ง์ ๋ pipeline
์ ํตํด ์๋ฆผ์ ์ ๋ฌํ ํ ์๋ฆผ์ ๋ฐํํ๋๋ก ์ง์ํฉ๋๋ค.
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
pipeline
๋ชฉ๋ก์๋ ๋ค์ ๊ธฐ์ค ์ค ํ๋ ๋๋ ๋ ๋ค๋ฅผ ์ถฉ์กฑํ๋ ์์
์ ํํฐ๋งํ๋ ๋จ์ผ $match
๋จ๊ณ๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
username
๊ฐ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.alice
operationType
๊ฐ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.delete
pipeline
์ watch()
๋ฉ์๋์ ์ ๋ฌํ๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์ง์ ๋ pipeline
์ ํตํด ์๋ฆผ์ ์ ๋ฌํ ํ ์๋ฆผ์ ๋ฐํํ๋๋ก ์ง์ํฉ๋๋ค.
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
๋ค์ ์๋ ์คํธ๋ฆผ์ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ํ๋ก์ธ์คํ๋ ์์์ ๋๋ค.
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream .on('change', next => { // process next document }) .once('error', error => { // handle error });
๋๋ ๋ฐ๋ณต๊ธฐ๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ํ๋ก์ธ์คํ ์๋ ์์ต๋๋ค.
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ๋ ๋ค์ ํ์ดํ๋ผ์ธ ๋จ๊ณ ์ค ํ๋ ์ด์์ ๋ฐฐ์ด์ ์ ๊ณตํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์ ์ ์ดํ ์ ์์ต๋๋ค.
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
ํ
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ ๋ฌธ์์ _id ํ๋๋ ์ฌ๊ฐ ํ ํฐ ์ญํ ์ ํฉ๋๋ค. ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ์ _id
ํ๋๋ฅผ ์์ ํ๊ฑฐ๋ ์ ๊ฑฐํ๊ธฐ ์ํด ํ์ดํ๋ผ์ธ์ ์ฌ์ฉํ์ง ๋ง์ธ์.
MongoDB 4.2๋ถํฐ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ง๊ณ ํ์ดํ๋ผ์ธ์ด ์ด๋ฒคํธ์ _id ํ๋๋ฅผ ์์ ํ๋ ๊ฒฝ์ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์์ธ๋ฅผ ๋ฐ์์ํต๋๋ค.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์๋ต ๋ฌธ์ ํ์์ ๋ํ ์์ธํ ๋ด์ฉ์ Change Events๋ฅผ ์ฐธ์กฐํ์ธ์.
์ ๋ฐ์ดํธ ์์ ์ ๋ํ ์ ์ฒด ๋ฌธ์ ์กฐํ
๊ธฐ๋ณธ์ ์ผ๋ก ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ ๋ฐ์ดํธ ์์ ์ค์ ํ๋์ ๋ธํ๋ง ๋ฐํํฉ๋๋ค. ๊ทธ๋ฌ๋ ์ ๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ ๊ณผ๋ฐ์ ์ปค๋ฐ ๋ฒ์ ์ ๋ฐํํ๋๋ก ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ ์ ์์ต๋๋ค.
โค ์ค๋ฅธ์ชฝ ์๋จ์ ์ธ์ด ์ ํ ๋๋กญ๋ค์ด ๋ฉ๋ด๋ฅผ ์ฌ์ฉํ์ฌ ์ด ํ์ด์ง์ ์๋ ์์์ ์ธ์ด๋ฅผ ์ค์ ํ์ธ์.
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ์ต์ ๋ค์ ์ปค๋ฐ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด "updateLookup"
๊ฐ๊ณผ ํจ๊ป "fullDocument"
์ต์
์ mongoc_collection_watch
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ fullDocument
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
์ db.collection.watch()
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ FullDocument
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ณผ๋ฐ์ ์ปค๋ฐ๋ ์ต์ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด SetFullDocument(options.UpdateLookup)
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ต์
์ ์ฌ์ฉํ์ธ์.
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด FullDocument.UPDATE_LOOKUP
์(๋ฅผ) {3} ๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ FullDocument
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
์
๋ฐ์ดํธ๋ ๋ฌธ์ ์ ๊ฐ์ฅ ์ต๊ทผ ๋ค์ FullDocument.UPDATE_LOOKUP
์ปค๋ฐ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด ์ ChangeStreamFlow.fullDocument() ๋ฉ์๋.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ FullDocument
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด full_document='updateLookup'
์ db.collection.watch()
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ `full_document
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด { fullDocument: 'updateLookup' }
์ db.collection.watch()
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ fullDocument
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
๋ค์ ์๋ ์คํธ๋ฆผ์ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ํ๋ก์ธ์คํ๋ ์์์ ๋๋ค.
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream .on('change', next => { // process next document }) .once('error', error => { // handle error });
๋๋ ๋ฐ๋ณต๊ธฐ๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ํ๋ก์ธ์คํ ์๋ ์์ต๋๋ค.
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
์ db.watch()
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ fullDocument
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด full_document='updateLookup'
์ db.collection.watch()
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ full_document
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด full_document: 'updateLookup'
์ db.watch()
๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
์๋ ์์์ ๋ชจ๋ ์
๋ฐ์ดํธ ์์
์๋ฆผ์๋ ์
๋ฐ์ดํธ ์์
์ ์ํฅ์ ๋ฐ๋ ๋ฌธ์์ ํ์ฌ ๋ฒ์ ์ ๋ํ๋ด๋ full_document
ํ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค.
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด options:
ChangeStreamOptions(fullDocument: .updateLookup)
์(๋ฅผ) {3} ๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
์
๋ฐ์ดํธ๋ ๋ฌธ์์ ๊ฐ์ฅ ์ต๊ทผ์ ์ปค๋ฐ๋ ๋ฒ์ ์ ๋ฐํํ๋ ค๋ฉด options:
ChangeStreamOptions(fullDocument: .updateLookup)
์(๋ฅผ) {3} ๋ฉ์๋์ ์ ๋ฌํฉ๋๋ค.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
์ฐธ๊ณ
์ ๋ฐ์ดํธ ์์ ํ, ์กฐํ ์ ์ ์ ๋ฐ์ดํธ๋ ๋ฌธ์๋ฅผ ์์ ํ ๋๋ค์ ์ปค๋ฐ ์์ ์ด ํ๋ ์ด์ ์๋ ๊ฒฝ์ฐ ๋ฐํ๋๋ ์ ์ฒด ๋ฌธ์๊ฐ ์ ๋ฐ์ดํธ ์์ ์์ ์ ๋ฌธ์์ ํฌ๊ฒ ๋ค๋ฅผ ์ ์์ต๋๋ค.
๊ทธ๋ฌ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ๋ฌธ์์ ํฌํจ๋ ๋ธํ๋ ํญ์ ํด๋น ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ์ ์ ์ฉ๋ ๊ฐ์ ๋์ ์ปฌ๋ ์ ๋ณ๊ฒฝ ์ฌํญ์ ์ ํํ๊ฒ ์ค๋ช ํฉ๋๋ค.
๋ค์ ์ค ํ๋์ ํด๋นํ๋ ๊ฒฝ์ฐ ์
๋ฐ์ดํธ ์ด๋ฒคํธ์ fullDocument
ํ๋๊ฐ ๋๋ฝ๋ ์ ์์ต๋๋ค.
๋ฌธ์๊ฐ ์ญ์ ๋๊ฑฐ๋ ์ ๋ฐ์ดํธ์ ์กฐํ ์ฌ์ด์ ์ปฌ๋ ์ ์ด ์ญ์ ๋ ๊ฒฝ์ฐ.
์ ๋ฐ์ดํธ๋ก ์ธํด ํด๋น ์ปฌ๋ ์ ์ ์ค๋ ํค์ ์๋ ํ๋ ์ค ํ๋ ์ด์์ ๊ฐ์ด ๋ณ๊ฒฝ๋๋ ๊ฒฝ์ฐ.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์๋ต ๋ฌธ์ ํ์์ ๋ํ ์์ธํ ๋ด์ฉ์ Change Events๋ฅผ ์ฐธ์กฐํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผ ๋ค์ ์์
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ปค์๋ฅผ ์ด ๋ ์ฌ๊ฐ ํ ํฐ์ resumeAfter ๋๋ startAfter๋ก ์ง์ ํ์ฌ ์ฌ๊ฐํ ์ ์์ต๋๋ค.
resumeAfter
Change Streams์ ๊ฒฝ์ฐ
์ปค์๋ฅผ ์ฌ๋ ์์ ์ ์ฌ๊ฐ(resume) ํ ํฐ์ resumeAfter
์ ์ ๋ฌํ์ฌ ํน์ ์ด๋ฒคํธ ์ดํ์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ฌ๊ฐํ ์ ์์ต๋๋ค.
์ฌ๊ฐ ํ ํฐ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ฌ๊ฐ ํ ํฐ์ ์ฐธ์กฐํ์ธ์.
์ค์
ํ์์คํฌํ๊ฐ ๊ณผ๊ฑฐ์ ์์๋ ๊ฒฝ์ฐ, oplog์ ํ ํฐ ๋๋ ํ์์คํฌํ์ ๊ด๋ จ๋ ์์ ์ ์ฐพ์ ์ ์๋ ์ถฉ๋ถํ ๊ธฐ๋ก์ด ์์ด์ผ ํฉ๋๋ค.
๋ฌดํจํ ์ด๋ฒคํธ(์์: ์ปฌ๋ ์ ์ ๊ฑฐ ๋๋ ์ด๋ฆ ๋ฐ๊พธ๊ธฐ)๋ก ์ธํด ์คํธ๋ฆผ์ด ๋ซํ ํ์๋
resumeAfter
๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ค์ ์์ํ ์ ์์ต๋๋ค. ๋์ ๋ฌดํจํ ์ด๋ฒคํธ ํ startAfter๋ฅผ ์ฌ์ฉํ์ฌ ์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ํ ์ ์์ต๋๋ค.
์๋ ์์์๋ ์คํธ๋ฆผ์ด ์ญ์ ๋ ํ ์คํธ๋ฆผ์ ๋ค์ ์์ฑํ๊ธฐ ์ํด ์คํธ๋ฆผ ์ต์
์ resumeAfter
์ต์
์ด ์ถ๊ฐ๋ฉ๋๋ค. ๋ณ๊ฒฝ ์คํธ๋ฆผ์ _id
์ ์ ๋ฌํ๋ฉด ์ง์ ๋ ์์
์ดํ๋ถํฐ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๊ณ ์๋ํฉ๋๋ค.
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
์๋ ์์์ resumeToken
์ ๋ง์ง๋ง ๋ณ๊ฒฝ ์คํธ๋ฆผ ๋ฌธ์์์ ๊ฒ์๋์ด ์ต์
์ผ๋ก Watch()
๋ฉ์๋์ ์ ๋ฌ๋ฉ๋๋ค. resumeToken
์ Watch()
๋ฉ์๋์ ์ ๋ฌํ๋ฉด ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
์ดํ๋ถํฐ ์๋ฆผ์ ๋ค์ ์์ํ๋๋ก ์๋ํฉ๋๋ค.
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
ChangeStreamOptions.SetResumeAfter๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ํ ์ฌ๊ฐ ํ ํฐ์ ์ง์ ํ ์ ์์ต๋๋ค. resumeAfter ์ต์
์ด ์ค์ ๋ ๊ฒฝ์ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ๋ค์ ์์ํฉ๋๋ค. SetResumeAfter
๋ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธ๋์ด์ผ ํ๋ ๊ฐ์ ์ทจํฉ๋๋ค. ์์: ์๋ ์์์์ resumeToken
.
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resumeAfter()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. resumeAfter()
๋ฉ์๋๋ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธ๋์ด์ผ ํ๋ ๊ฐ(์: ์๋ ์์์ resumeToken
)์ ์ฌ์ฉํฉ๋๋ค.
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
ChangeStreamFlow.resumeAfter() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ ์๋ฆผ ์ ๋ค์ ์์ํฉ๋๋ค. resumeAfter()
๋ฉ์๋๋ ์๋ ์์ resumeToken
๋ณ์์ ๊ฐ์ด ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํด์๋์ด์ผ ํ๋ ๊ฐ์ ์ฌ์ฉํฉ๋๋ค.
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resume_after
์์ ์๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. resume_after
์์ ์๋ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธํด์ผ ํ๋ ๊ฐ์ ์ฌ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด ์๋ ์์์ resume_token
์(๋ฅผ) ๋งํฉ๋๋ค.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resumeAfter
์ต์
์ ์ฌ์ฉํ ์ ์์ต๋๋ค. resumeAfter
์ต์
์ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธ๋์ด์ผ ํ๋ ๊ฐ์ ์ทจํฉ๋๋ค. ์์: ์๋ ์์์ resumeToken
.
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream .once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream .on('change', next => { processChange(next); }) .once('error', error => { // handle error }); }) .once('error', error => { // handle error });
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resumeAfter
์ต์
์ ์ฌ์ฉํ ์ ์์ต๋๋ค. resumeAfter
์ต์
์ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธ๋์ด์ผ ํ๋ ๊ฐ์ ์ทจํฉ๋๋ค. ์์: ์๋ ์์์ $resumeToken
.
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resume_after
์์ ์๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. resume_after
์์ ์๋ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธํด์ผ ํ๋ ๊ฐ์ ์ฌ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด ์๋ ์์์ resume_token
์(๋ฅผ) ๋งํฉ๋๋ค.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resume_after
์์ ์๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. resume_after
์์ ์๋ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธํด์ผ ํ๋ ๊ฐ์ ์ฌ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด ์๋ ์์์ resume_token
์(๋ฅผ) ๋งํฉ๋๋ค.
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resumeAfter
์ต์
์ ์ฌ์ฉํ ์ ์์ต๋๋ค. resumeAfter
์ต์
์ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธ๋์ด์ผ ํ๋ ๊ฐ์ ์ทจํฉ๋๋ค. ์์: ์๋ ์์์ resumeToken
.
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
์ฌ๊ฐ ํ ํฐ์ ์ง์ ๋ ์์
ํ์ ์๋ฆผ์ ์ฌ๊ฐํ๋ ค๋ฉด resumeAfter
์ต์
์ ์ฌ์ฉํ ์ ์์ต๋๋ค. resumeAfter
์ต์
์ ์ฌ๊ฐ ํ ํฐ์ผ๋ก ํ์ธ๋์ด์ผ ํ๋ ๊ฐ์ ์ทจํฉ๋๋ค. ์์: ์๋ ์์์ resumeToken
.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
Change Streams์ ๊ฒฝ์ฐ
์ปค์๋ฅผ ์ฌ๋ ์์ ์ ์ฌ๊ฐ(resume) ํ ํฐ์ startAfter
์ ์ ๋ฌํ์ฌ ํน์ ์ด๋ฒคํธ ์ดํ์ ์๋ก์ด ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ํ ์ ์์ต๋๋ค. restartAfter ์ ๋ฌ๋ฆฌ startAfter
๋ ์๋ก์ด ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์์ฑํ์ฌ ๋ฌดํจํ ์ด๋ฒคํธ ํ์ ์๋ฆผ์ ์ฌ๊ฐํ ์ ์์ต๋๋ค.
์ฌ๊ฐ ํ ํฐ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ฌ๊ฐ ํ ํฐ์ ์ฐธ์กฐํ์ธ์.
์ค์
ํ์์คํฌํ๊ฐ ๊ณผ๊ฑฐ์ ์์๋ ๊ฒฝ์ฐ, oplog์ ํ ํฐ ๋๋ ํ์์คํฌํ์ ๊ด๋ จ๋ ์์ ์ ์ฐพ์ ์ ์๋ ์ถฉ๋ถํ ๊ธฐ๋ก์ด ์์ด์ผ ํฉ๋๋ค.
ํ ํฐ ์ฌ๊ฐ
์ฌ๊ฐ ํ ํฐ์ ์ฌ๋ฌ ์ถ์ฒ์์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
์์ค | ์ค๋ช
|
---|---|
๊ฐ ๋ณ๊ฒฝ ์ด๋ฒคํธ ์๋ฆผ์ | |
์ด ํ๋๋ | |
|
MongoDB 4.2๋ถํฐ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ง๊ณ ํ์ดํ๋ผ์ธ์ด ์ด๋ฒคํธ์ _id ํ๋๋ฅผ ์์ ํ๋ ๊ฒฝ์ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ด ์์ธ๋ฅผ ๋ฐ์์ํต๋๋ค.
ํ
MongoDB๋ mongosh
ํ์ฅ์ธ "์ค๋ํซ"์ ์ ๊ณตํ์ฌ 16์ง์๋ก ์ธ์ฝ๋ฉ๋ ์ฌ๊ฐ ํ ํฐ์ ๋์ฝ๋ฉํฉ๋๋ค.
์์ resumetoken ์ค๋ํซ์ ์ค์นํ๊ณ ์คํํ ์ mongosh
์์ต๋๋ค.
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
์์คํ
์ npm
(์ด)๊ฐ ์ค์น๋์ด ์๋ ๊ฒฝ์ฐ ๋ช
๋ น์ค์์ (mongosh
(์)๋ฅผ ์ฌ์ฉํ์ง ์๊ณ )resumetoken์ ์คํํ ์๋ ์์ต๋๋ค.
npx mongodb-resumetoken-decoder <RESUME TOKEN>
์์ธํ ๋ด์ฉ์ ๋ค์์ ์ฐธ์กฐํ์ธ์.
mongosh
์์ ์ค๋ํซ์ ์ฌ์ฉํฉ๋๋ค.
๋ณ๊ฒฝ ์ด๋ฒคํธ์์ ํ ํฐ ์ฌ๊ฐ
๋ณ๊ฒฝ ์ด๋ฒคํธ ์๋ฆผ์ _id
ํ๋์ ์ฌ๊ฐ ํ ํฐ์ ํฌํจํฉ๋๋ค.
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
์์ ํ ํฐ ์ฌ๊ฐ aggregate
aggregate
๋ช
๋ น์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ $changeStream
์ง๊ณ ๋จ๊ณ์์ cursor.postBatchResumeToken
ํ๋์ ์ฌ๊ฐ ํ ํฐ์ ํฌํจํฉ๋๋ค:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
์์ ํ ํฐ ์ฌ๊ฐ getMore
getMore
๋ช
๋ น์ cursor.postBatchResumeToken
ํ๋์ ์ฌ๊ฐ ํ ํฐ์ ํฌํจํฉ๋๋ค.
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
์ฌ์ฉ ์ฌ๋ก
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ์ด ์ง์๋๋ฉด ๋ค์ด์คํธ๋ฆผ ์์คํ ์ ์๋ ค์ฃผ๋ฏ๋ก ๋น์ฆ๋์ค ์์คํ ์ ์์กดํ๋ ์ํคํ ์ฒ์ ๋์์ด ๋ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๊ฐ๋ฐ์๊ฐ ์ถ์ถ, ๋ณํ ๋ฐ ๋ก๋(ETL) ์๋น์ค, ํ๋ซํผ ๊ฐ ๋๊ธฐํ, ํ์ ๊ธฐ๋ฅ ๋ฐ ์๋ฆผ ์๋น์ค๋ฅผ ๊ตฌํํ ๋ ์๊ฐ์ ์ ์ฝํ ์ ์์ต๋๋ค.
์ก์ธ์ค ์ ์ด
์์ฒด ๊ด๋ฆฌ ๋ฐฐํฌ์๋ฒ์์ ์ธ์ฆ ๋ฐ ๊ถํ ๋ถ์ฌ๋ฅผ ์ ์ฉํ ๋ฐฐํฌ์ ๊ฒฝ์ฐ ๋ค์์ด ํ์ํฉ๋๋ค.
ํน์ ์ปฌ๋ ์ ์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ด๋ ค๋ฉด ์ ํ๋ฆฌ์ผ์ด์ ์ ํด๋น ์ปฌ๋ ์ ์ ๋ํด
changeStream
๋ฐfind
์กฐ์น๋ฅผ ๋ถ์ฌํ ์ ์๋ ๊ถํ์ด ์์ด์ผ ํฉ๋๋ค.{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } ๋จ์ผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ด๋ ค๋ฉด ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ชจ๋ ๋น
system
์ปฌ๋ ์ ์ ๋ํดchangeStream
๋ฐfind
์์ ์ ์ํํ ์ ์๋ ๊ถํ์ด ์์ด์ผ ํฉ๋๋ค.{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } ์ ์ฒด ๋ฐฐํฌ์์ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ์ด๋ ค๋ฉด ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฐฐํฌ์ ๋ชจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ํด
system
์ด ์๋ ๋ชจ๋ collection์ ๋ํดchangeStream
๋ฐfind
์กฐ์น๋ฅผ ํ์ฉํ๋ ๊ถํ์ด ์์ด์ผ ํฉ๋๋ค.{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
์ด๋ฒคํธ ์๋ฆผ
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ณต์ ๋ณธ ์งํฉ์ ์๋ ๋๋ถ๋ถ์ ๋ฐ์ดํฐ ๋ณด์ ๋ฉค๋ฒ์ ์ง์๋ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ๋ด์ฉ์ ๋ํด์๋ง ์๋ฆฝ๋๋ค. ์ด๋ ๊ฒ ํ๋ฉด ์ค๋ฅ ์๋๋ฆฌ์ค์์ ์ง์์ ์ธ ๋๋ค์ ์ปค๋ฐ ๋ณ๊ฒฝ ์ฌํญ์ ๋ํด์๋ง ์๋ฆผ์ด ํธ๋ฆฌ๊ฑฐ๋ฉ๋๋ค.
์๋ฅผ ๋ค์ด ํ๋ผ์ด๋จธ๋ฆฌ์ ๋ํด ์ด๋ฆฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ปค์๊ฐ ์๋ 3-๋ ธ๋ ๋ณต์ ๋ณธ ์ธํธ๊ฐ ์๋ค๊ณ ๊ฐ์ ํด ๋ด ์๋ค. ํด๋ผ์ด์ธํธ๊ฐ ์ฝ์ ์ฐ์ฐ์ ์คํํ๋ ๊ฒฝ์ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ํด๋น ์ฝ์ ์ด ๋๋ถ๋ถ์ ๋ฐ์ดํฐ ๋ณด์ ๋ ธ๋์ ์ง์๋ ๊ฒฝ์ฐ์๋ง ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ์ ์๋ฆฝ๋๋ค.
์์
์ด ํธ๋์ญ์
๊ณผ ์ฐ๊ฒฐ๋ ๊ฒฝ์ฐ, ๋ณ๊ฒฝ ์ด๋ฒคํธ ๋ฌธ์์ txnNumber
๋ฐ lsid
๊ฐ ํฌํจ๋ฉ๋๋ค.
๋ฐ์ดํฐ ์ ๋ ฌ
๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ช
์์ ์ธ ๋ฐ์ดํฐ ์ ๋ ฌ์ด ์ ๊ณต๋์ง ์๋ ํ simple
์ด์ง ๋น๊ต๋ฅผ ์ฌ์ฉํฉ๋๋ค.
Change Streams ๋ฐ ๊ณ ์ ๋ฌธ์
MongoDB 5.3๋ถํฐ๋ ๋ฒ์ ๋ง์ด๊ทธ๋ ์ด์ ์ค์ ๊ณ ์ ๋ฌธ์์ ๋ํ ์ ๋ฐ์ดํธ์ ๋ํ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ๊ฐ ์์ฑ๋์ง ์์ต๋๋ค.
์ ํ ์ด๋ฏธ์ง๋ฅผ ํฌํจํ๋ ๋ฌธ์์ Change Streams
MongoDB 6.0๋ถํฐ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ๋ฅผ ์ฌ์ฉํ์ฌ ๋ฌธ์์ ๋ณ๊ฒฝ ์ ํ ๋ฒ์ (๋ฌธ์์ ์ ํ ์ด๋ฏธ์ง)์ ์ถ๋ ฅํ ์ ์์ต๋๋ค.
์ฌ์ ์ด๋ฏธ์ง๋ ๋ฌธ์๊ฐ ๊ต์ฒด, ์ ๋ฐ์ดํธ ๋๋ ์ญ์ ๋๊ธฐ ์ ์ ๋ฌธ์์ ๋๋ค. ์ฝ์ ๋ ๋ฌธ์์๋ ์ฌ์ ์ด๋ฏธ์ง๊ฐ ์์ต๋๋ค.
์ฌํ ์ด๋ฏธ์ง๋ ๋ฌธ์๊ฐ ์ฝ์ , ๊ต์ฒด, ์ ๋ฐ์ดํธ๋ ํ์ ๋ฌธ์์ ๋๋ค. ์ญ์ ๋ ๋ฌธ์์ ๋ํ ์ฌํ ์ด๋ฏธ์ง๊ฐ ์์ต๋๋ค.
db.createCollection()
,create
๋๋collMod
์(๋ฅผ) ์ฌ์ฉํ๋ ์ปฌ๋ ์ ์ ๋ํดchangeStreamPreAndPostImages
์(๋ฅผ) ํ์ฑํํ์ธ์.
์ด๋ฏธ์ง๊ฐ ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ์ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ์ ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ฌธ์ ์ ๋ฐ์ดํธ ๋๋ ์ญ์ ์์ ์ collection์์ ํ์ฑํ๋์ง ์์์ต๋๋ค.
expireAfterSeconds
์์ ์ ํ ์ด๋ฏธ์ง ๋ณด์กด ์๊ฐ ์ค์ ์ดํ์ ์ ๊ฑฐ๋ฉ๋๋ค.๋ค์ ์์์์๋ ์ ์ฒด ํด๋ฌ์คํฐ์์
expireAfterSeconds
๋ฅผ100
์ด๋ก ์ค์ ํฉ๋๋ค.use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) ๋ค์ ์์์์๋
expireAfterSeconds
๋ฑ ํ์ฌchangeStreamOptions
์ค์ ์ ๋ฐํํฉ๋๋ค.db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) expireAfterSeconds
๋ฅผoff
๋ก ์ค์ ํ๋ฉด ๊ธฐ๋ณธ ๋ณด์กด ์ ์ฑ ์ด ์ฌ์ฉ๋๋ฉฐ, ํด๋น ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ๊ฐ oplog์์ ์ ๊ฑฐ๋ ๋๊น์ง ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๊ฐ ๋ณด์กด๋ฉ๋๋ค.๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ๊ฐ oplog์์ ์ ๊ฑฐ๋๋ฉด
expireAfterSeconds
์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง ๋ณด์กด ์๊ฐ์ ๊ด๊ณ์์ด ํด๋น ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ ์ญ์ ๋ฉ๋๋ค.
์ถ๊ฐ ๊ณ ๋ ค ์ฌํญ
์ ํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํ๋ฉด ์ ์ฅ ๊ณต๊ฐ์ด ์๋ชจ๋๊ณ ์ฒ๋ฆฌ ์๊ฐ์ด ๋์ด๋ฉ๋๋ค. ํ์ํ ๊ฒฝ์ฐ์๋ง ์ ํ ์ด๋ฏธ์ง๋ฅผ ํ์ฑํํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ ํฌ๊ธฐ๋ฅผ 16๋ฉ๋น๋ฐ์ดํธ ๋ฏธ๋ง์ผ๋ก ์ ํํฉ๋๋ค. ์ด๋ฒคํธ ํฌ๊ธฐ๋ฅผ ์ ํํ๋ ค๋ฉด ๋ค์์ ์ํํ๋ฉด ๋ฉ๋๋ค.
๋ฌธ์ ํฌ๊ธฐ๋ฅผ 8๋ฉ๊ฐ๋ฐ์ดํธ๋ก ์ ํํฉ๋๋ค.
updateDescription
๊ณผ ๊ฐ์ ๋ค๋ฅธ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ ํ๋๊ฐ ํฌ์ง ์์ ๊ฒฝ์ฐ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์์ ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ๋์์ ์์ฒญํ ์ ์์ต๋๋ค.updateDescription
๊ณผ ๊ฐ์ ๋ค๋ฅธ ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ ํ๋๊ฐ ํฌ์ง ์์ ๊ฒฝ์ฐ, ์ต๋ 16๋ฉ๋น๋ฐ์ดํธ ๋ฌธ์์ ๋ํด ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์์ ์ฌํ ์ด๋ฏธ์ง๋ง ์์ฒญํฉ๋๋ค.๋ค์๊ณผ ๊ฐ์ ๊ฒฝ์ฐ ์ต๋ 16๋ฉ๋น๋ฐ์ดํธ์ ๋ฌธ์์ ๋ํด ๋ณ๊ฒฝ ์คํธ๋ฆผ ์ถ๋ ฅ์์ ์ฌ์ ์ด๋ฏธ์ง๋ง ์์ฒญํฉ๋๋ค.
๋ฌธ์ ์ ๋ฐ์ดํธ๊ฐ ๋ฌธ์ ๊ตฌ์กฐ๋ ๋ด์ฉ์ ์์ ๋ถ๋ถ์๋ง ์ํฅ์ ๋ฏธ์นฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ
replace
๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ๋ฐ์์ํค์ง ์์ต๋๋ค.replace
์ด๋ฒคํธ์๋ ํญ์ ํ์ด๋ฏธ์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
์ฌ์ ์ด๋ฏธ์ง๋ฅผ ์์ฒญํ๋ ค๋ฉด
db.collection.watch()
์์fullDocumentBeforeChange
๋ฅผrequired
๋๋whenAvailable
๋ก ์ค์ ํฉ๋๋ค. ์ฌํ ์ด๋ฏธ์ง๋ฅผ ์์ฒญํ๋ ค๋ฉด ๋์ผํ ๋ฐฉ๋ฒ์ผ๋กfullDocument
๋ฅผ ์ค์ ํฉ๋๋ค.์ฌ์ ์ด๋ฏธ์ง๊ฐ
config.system.preimages
์ปฌ๋ ์ ์ ๊ธฐ๋ก๋ฉ๋๋ค.config.system.preimages
collection์ ์ปค์ง ์ ์์ต๋๋ค. collection ํฌ๊ธฐ๋ฅผ ์ ํํ๋ ค๋ฉด ์์ ํ์๋ ๋๋ก ์ฌ์ ์ด๋ฏธ์ง์ ๋ํดexpireAfterSeconds
์๊ฐ์ ์ค์ ํ ์ ์์ต๋๋ค.์ฌ์ ์ด๋ฏธ์ง๋ ๋ฐฑ๊ทธ๋ผ์ด๋ ํ๋ก์ธ์ค๊ฐ ๋น๋๊ธฐ์ ์ผ๋ก ์ ๊ฑฐํฉ๋๋ค.
์ค์
์ด์ ๋ฒ์ ๊ณผ ํธํ๋์ง ์๋ ๊ธฐ๋ฅ
MongoDB 6.0๋ถํฐ๋ ๋ณ๊ฒฝ ์คํธ๋ฆผ์ ๋ฌธ์ ์ฌ์ ๋ฐ ์ฌํ ์ด๋ฏธ์ง๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ์ด์ MongoDB ๋ฒ์ ์ผ๋ก ๋ค์ด๊ทธ๋ ์ด๋ํ๊ธฐ ์ ์ collMod
๋ช
๋ น์ ์ฌ์ฉํ์ฌ ๊ฐ collection์ ๋ํด changeStreamPreAndPostImages๋ฅผ ๋นํ์ฑํํด์ผ ํฉ๋๋ค.
๋ค์๋ ์ฐธ์กฐํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผ ์ด๋ฒคํธ ๋ฐ ์ถ๋ ฅ์ ๋ํด์๋ ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ์ฐธ์กฐํ์ธ์.
์ปฌ๋ ์ ์์ ๋ณ๊ฒฝ ์ฌํญ์ ํ์ธํ๋ ค๋ฉด
db.collection.watch()
๋ฅผ ์ฐธ์กฐํ์ธ์.๋ณ๊ฒฝ ์คํธ๋ฆผs ์ถ๋ ฅ์ ๋ํ ์ ์ฒด ์์๋ ์ ํ ์ด๋ฏธ์ง๋ฅผ ํฌํจํ๋ ๋ฌธ์์ ๋ณ๊ฒฝ ์คํธ๋ฆผs๋ฅผ ์ฐธ์กฐํ์ธ์.
๋ณ๊ฒฝ ์คํธ๋ฆผs ์ถ๋ ฅ์ ๋ํ ์ ์ฒด ์์๋ ์ ํ ์ด๋ฏธ์ง๋ฅผ ํฌํจํ๋ ๋ฌธ์์ ๋ณ๊ฒฝ ์คํธ๋ฆผs๋ฅผ ์ฐธ์กฐํ์ธ์.