변경 스트림
이 페이지의 내용
변경 스트림을 통해 애플리케이션은 사전에 복잡한 방식 및 수동으로 oplog를 테일링하는 위험 없이 실시간 데이터 변경에 액세스할 수 있습니다. 애플리케이션은 변경 스트림을 사용하여 단일 컬렉션, 데이터베이스 또는 전체 배포의 모든 데이터 변경 사항을 구독하고 이에 즉시 대응할 수 있습니다. 변경 스트림은 집계 프레임워크를 사용하기 때문에 애플리케이션에서 특정 변경 사항을 필터링하거나 알림을 마음대로 변환할 수도 있습니다.
MongoDB 5.1부터는 변경 스트림이 최적화되어 더 효율적인 리소스 사용률과 일부 집계 파이프라인 단계의 더 빠른 실행을 제공합니다.
가용성
변경 스트림은 복제본 세트 및 샤딩된 클러스터에 사용할 수 있습니다:
스토리지 엔진
복제본 세트와 샤딩된 클러스터는 WiredTiger 스토리지 엔진을 사용해야 합니다. Change Stream은 MongoDB의 미사용 데이터 암호화 기능을 사용하는 배포에서도 사용할 수 있습니다.
복제본 세트 프로토콜 버전.
복제본 세트와 샤딩된 클러스터는 복제본 세트 프로토콜 버전 1(
pv1
)를 사용해야 합니다.읽기 고려 '대다수' 활성화
변경 스트림 은
"majority"
읽기 고려 지원에 관계없이 사용할 수 있습니다. 즉, 읽기 고려majority
지원을 활성화(기본값)하거나 비활성화 하여 변경 스트림을 사용할 수 있습니다.
Stable API 지원
변경 스트림은 Stable API V1 에 포함되어 있습니다. 그러나 showExpandedEvents 옵션은 Stable API V1 에 포함되어 있지 않습니다.
연결
변경 스트림에 대한 연결은 +srv
연결 옵션과 함께 DNS 시드 목록을 사용하거나 연결 문자열에 서버를 개별적으로 나열하여 사용할 수 있습니다.
드라이버가 변경 스트림에 대한 연결을 잃거나 연결이 다운되면 클러스터 내 읽기 설정이 일치하는 다른 노드를 통해 변경 스트림에 대한 연결을 다시 설정하려고 시도합니다. 드라이버가 올바른 읽기 설정을 가진 노드를 찾을 수 없으면 예외가 발생합니다.
자세한 내용은 연결 문자열 URI 형식을 참조하세요.
컬렉션, 데이터베이스 또는 배포 보기
다음에 대한 변경 스트림을 열 수 있습니다.
대상 | 설명 |
---|---|
컬렉션 | 단일 컬렉션( 이 페이지의 예시에서는 MongoDB 변경 스트림 드라이버를 사용하여 단일 컬렉션에 대해 커서를 열고 작업합니다. |
데이터베이스 | 단일 데이터베이스( MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. |
배포 | 배포(복제본 세트 또는 샤딩된 클러스터)에 대한 변경 스트림 커서를 열어 MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. |
참고
변경 스트림 예시
이 페이지의 예시에서는 컬렉션에 대한 변경 스트림 커서를 열고 변경 스트림 커서로 작업하는 방법을 설명하기 위해 MongoDB 드라이버를 사용합니다.
변경 스트림 성능 고려 사항
데이터베이스에 대해 열려 있는 변경 스트림의 양이 연결 풀 크기를 초과할 경우 알림 지연이 발생할 수 있습니다. 각 변경 스트림은 다음 이벤트를 기다리는 시간 동안 변경 스트림에 대한 연결과 GetMore 작업을 사용합니다. 지연 시간 문제를 방지하려면 풀 크기가 열려 있는 변경 스트림 수보다 큰지 확인해야 합니다. 자세한 내용은 MaxPoolSize 설정을 참조하세요.
샤딩된 클러스터 고려 사항
샤딩된 클러스터에서 변경 스트림이 열리는 경우:
mongos
는 각 샤드에서 개별적인 변경 스트림을 생성합니다. 이 동작은 변경 스트림이 특정 샤드 키 범위를 대상으로 하는지 여부에 관계없이 발생합니다.mongos
에서 변경 스트림 결과를 수신하면 해당 결과가 정렬 및 필터링됩니다. 필요한 경우mongos
에서fullDocument
조회도 수행합니다.
최상의 성능을 얻으려면 변경 스트림에서 $lookup
쿼리 사용을 제한하세요.
변경 스트림 열기
변경 스트림을 열려면 다음을 수행합니다.
복제본 세트의 경우 데이터 보유 멤버 중 하나에서 변경 스트림 열기 작업을 실행할 수 있습니다.
샤딩된 클러스터의 경우
mongos
에서 변경 스트림 열기 작업을 실행해야 합니다.
다음 예시는 컬렉션에 대한 변경 스트림을 열고 커서를 반복하여 변경 스트림 문서를 조회합니다. [1]
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
아래 C 예제는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션을 포함한 데이터베이스에 액세스
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
아래 C# 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
아래 Go 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
아래 Java 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스 했다고 가정합니다.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
아래의 코틀린 (Kotlin) 예제에서는 MongoDB 복제본 세트 에 연결되어 있고 inventory
컬렉션 이 포함된 데이터베이스 에 액세스 할 수 있다고 가정합니다. 이러한 작업을 완료하는 방법에 학습 보려면 코틀린 (Kotlin) 루틴 드라이버 데이터베이스 및 컬렉션 가이드 를 참조하세요.
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
아래 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션을 포함한 데이터베이스에 액세스 했다고 가정합니다.
cursor = db.inventory.watch() document = await cursor.next()
아래 Node.js 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
ChangeStream은 EventEmitter를 확장합니다.
아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
아래 Python 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
cursor = db.inventory.watch() next(cursor)
아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
cursor = inventory.watch.to_enum next_change = cursor.next
아래 Swift (비동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
아래 Swift(동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
커서에서 데이터 변경 이벤트를 조회하려면 변경 스트림 커서를 반복합니다. 변경 스트림 이벤트에 대한 자세한 내용은 Change Events를 참조하세요.
변경 스트림 커서는 다음 중 하나가 발생할 때까지 열려 있습니다.
커서가 명시적으로 닫힙니다.
무효화 이벤트가 발생합니다. 예를 들어 컬렉션 제거 또는 이름 바꾸기가 있습니다.
MongoDB 배포에 대한 연결이 닫히거나 시간 초과됩니다. 자세한 내용은 Cursor Behaviors를 참조하십시오.
배포가 샤딩된 클러스터인 경우 샤드를 제거하면 열려 있는 변경 스트림 커서가 닫힐 수 있습니다. 닫힌 변경 스트림 커서는 완전히 재개되지 않을 수 있습니다.
참고
닫히지 않은 커서의 라이프사이클은 언어에 따라 다릅니다.
[1] | startAtOperationTime 을 지정하여 특정 시점에 커서를 열 수 있습니다. 지정된 시작점이 과거인 경우 oplog의 시간 범위 내에 있어야 합니다. |
변경 스트림 출력 수정
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
목록에는 다음 기준 중 하나 또는 둘 다를 충족하는 작업을 필터링하는 단일 $match
단계가 포함되어 있습니다.
username
값은 다음과 같습니다.alice
operationType
값은 다음과 같습니다.delete
pipeline
을 watch()
메서드에 전달하면 변경 스트림이 지정된 pipeline
을 통해 알림을 전달한 후 알림을 반환하도록 지시합니다.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
pipeline
목록에는 다음 기준 중 하나 또는 둘 다를 충족하는 작업을 필터링하는 단일 $match
단계가 포함되어 있습니다.
username
값은 다음과 같습니다.alice
operationType
값은 다음과 같습니다.delete
pipeline
을 watch()
메서드에 전달하면 변경 스트림이 지정된 pipeline
을 통해 알림을 전달한 후 알림을 반환하도록 지시합니다.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
팁
변경 스트림 이벤트 문서의 _id 필드는 재개 토큰 역할을 합니다. 변경 스트림 이벤트의 _id
필드를 수정하거나 제거하기 위해 파이프라인을 사용하지 마세요.
MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.
변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.
업데이트 작업에 대한 전체 문서 조회
기본적으로 변경 스트림은 업데이트 작업 중에 필드의 델타만 반환합니다. 그러나 업데이트된 문서의 가장 최근 과반수 커밋 버전을 반환하도록 변경 스트림을 구성할 수 있습니다.
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
업데이트된 문서의 최신 다수 커밋 버전을 반환하려면 "updateLookup"
값과 함께 "fullDocument"
옵션을 mongoc_collection_watch
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument
필드가 포함되어 있습니다.
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument
필드가 포함되어 있습니다.
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
업데이트된 문서의 과반수 커밋된 최신 버전을 반환하려면 SetFullDocument(options.UpdateLookup)
변경 스트림 옵션을 사용하세요.
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 FullDocument.UPDATE_LOOKUP
을(를) {3} 메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument
필드가 포함되어 있습니다.
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
업데이트된 문서 의 가장 최근 다수 FullDocument.UPDATE_LOOKUP
커밋 버전을 반환하려면 을 ChangeStreamFlow.fullDocument() 메서드.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument
필드가 포함되어 있습니다.
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 `full_document
필드가 포함되어 있습니다.
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 { fullDocument: 'updateLookup' }
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument
필드가 포함되어 있습니다.
다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
을 db.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument
필드가 포함되어 있습니다.
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document
필드가 포함되어 있습니다.
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document: 'updateLookup'
을 db.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document
필드가 포함되어 있습니다.
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options:
ChangeStreamOptions(fullDocument: .updateLookup)
을(를) {3} 메서드에 전달합니다.
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options:
ChangeStreamOptions(fullDocument: .updateLookup)
을(를) {3} 메서드에 전달합니다.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
참고
업데이트 작업 후, 조회 전에 업데이트된 문서를 수정한 대다수 커밋 작업이 하나 이상 있는 경우 반환되는 전체 문서가 업데이트 작업 시점의 문서와 크게 다를 수 있습니다.
그러나 변경 스트림 문서에 포함된 델타는 항상 해당 변경 스트림 이벤트에 적용된 감시 대상 컬렉션 변경 사항을 정확하게 설명합니다.
다음 중 하나에 해당하는 경우 업데이트 이벤트의 fullDocument
필드가 누락될 수 있습니다.
문서가 삭제되거나 업데이트와 조회 사이에 컬렉션이 삭제된 경우.
업데이트로 인해 해당 컬렉션의 샤드 키에 있는 필드 중 하나 이상의 값이 변경되는 경우.
변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.
변경 스트림 다시 시작
변경 스트림은 커서를 열 때 재개 토큰을 resumeAfter 또는 startAfter로 지정하여 재개할 수 있습니다.
resumeAfter
Change Streams의 경우
커서를 여는 시점에 재개(resume) 토큰을 resumeAfter
에 전달하여 특정 이벤트 이후에 변경 스트림을 재개할 수 있습니다.
재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.
중요
타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.
무효화 이벤트(예시: 컬렉션 제거 또는 이름 바꾸기)로 인해 스트림이 닫힌 후에는
resumeAfter
를 사용하여 변경 스트림을 다시 시작할 수 없습니다. 대신 무효화 이벤트 후 startAfter를 사용하여 새 변경 스트림을 시작할 수 있습니다.
아래 예에서는 스트림이 삭제된 후 스트림을 다시 생성하기 위해 스트림 옵션에 resumeAfter
옵션이 추가됩니다. 변경 스트림에 _id
을 전달하면 지정된 작업 이후부터 알림을 재개하려고 시도합니다.
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
아래 예에서 resumeToken
은 마지막 변경 스트림 문서에서 검색되어 옵션으로 Watch()
메서드에 전달됩니다. resumeToken
을 Watch()
메서드에 전달하면 변경 스트림이 재개 토큰에 지정된 작업 이후부터 알림을 다시 시작하도록 시도합니다.
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
ChangeStreamOptions.SetResumeAfter를 사용하여 변경 스트림에 대한 재개 토큰을 지정할 수 있습니다. resumeAfter 옵션이 설정된 경우 변경 스트림은 재개 토큰에 지정된 작업 후에 알림을 다시 시작합니다. SetResumeAfter
는 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예시에서 resumeToken
.
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter()
메서드를 사용할 수 있습니다. resumeAfter()
메서드는 재개 토큰으로 확인되어야 하는 값(예: 아래 예시의 resumeToken
)을 사용합니다.
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
ChangeStreamFlow.resumeAfter() 메서드를 사용하여 재개 토큰에 지정된 작업 후 알림 을 다시 시작합니다. resumeAfter()
메서드는 아래 예시 resumeToken
변수와 같이 재개 토큰으로 해석되어야 하는 값을 사용합니다.
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after
수정자를 사용할 수 있습니다. resume_after
수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token
을(를) 말합니다.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken
.
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 $resumeToken
.
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after
수정자를 사용할 수 있습니다. resume_after
수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token
을(를) 말합니다.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after
수정자를 사용할 수 있습니다. resume_after
수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token
을(를) 말합니다.
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken
.
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken
.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
Change Streams의 경우
커서를 여는 시점에 재개(resume) 토큰을 startAfter
에 전달하여 특정 이벤트 이후에 새로운 변경 스트림을 시작할 수 있습니다. restartAfter 와 달리 startAfter
는 새로운 변경 스트림을 생성하여 무효화 이벤트 후에 알림을 재개할 수 있습니다.
재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.
중요
타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.
토큰 재개
재개 토큰은 여러 출처에서 사용할 수 있습니다.
소스 | 설명 |
---|---|
각 변경 이벤트 알림은 _id 필드에 재개 토큰을 포함합니다. | |
이 필드는 | |
getMore 명령은 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다. |
MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.
팁
MongoDB는 mongosh
확장인 "스니펫"을 제공하여 16진수로 인코딩된 재개 토큰을 디코딩합니다.
에서 resumetoken 스니펫을 설치하고 실행할 수 mongosh
있습니다.
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
시스템에 npm
(이)가 설치되어 있는 경우 명령줄에서 (mongosh
(을)를 사용하지 않고)resumetoken을 실행할 수도 있습니다.
npx mongodb-resumetoken-decoder <RESUME TOKEN>
자세한 내용은 다음을 참조하세요.
변경 이벤트에서 토큰 재개
변경 이벤트 알림은 _id
필드에 재개 토큰을 포함합니다.
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
에서 토큰 재개 aggregate
aggregate
명령을 사용하는 경우 $changeStream
집계 단계에서 cursor.postBatchResumeToken
필드에 재개 토큰을 포함합니다:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
에서 토큰 재개 getMore
getMore
명령은 cursor.postBatchResumeToken
필드에 재개 토큰을 포함합니다.
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
사용 사례
변경 스트림은 데이터 변경이 지속되면 다운스트림 시스템에 알려주므로 비즈니스 시스템에 의존하는 아키텍처에 도움이 될 수 있습니다. 예를 들어, 변경 스트림은 개발자가 추출, 변환 및 로드(ETL) 서비스, 플랫폼 간 동기화, 협업 기능 및 알림 서비스를 구현할 때 시간을 절약할 수 있습니다.
액세스 제어
자체 관리 배포서버에서 인증 및 권한 부여를 적용한 배포의 경우 다음이 필요합니다.
특정 컬렉션에 대한 변경 스트림을 열려면 애플리케이션에 해당 컬렉션에 대해
changeStream
및find
조치를 부여할 수 있는 권한이 있어야 합니다.{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 단일 데이터베이스에서 변경 스트림을 열려면 애플리케이션에 데이터베이스의 모든 비
system
컬렉션에 대해changeStream
및find
작업을 수행할 수 있는 권한이 있어야 합니다.{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 전체 배포에서 변경 스트림을 열려면 애플리케이션에 배포의 모든 데이터베이스에 대해
system
이 아닌 모든 collection에 대해changeStream
및find
조치를 허용하는 권한이 있어야 합니다.{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
이벤트 알림
변경 스트림은 복제본 집합에 있는 대부분의 데이터 보유 멤버에 지속된 데이터 변경 내용에 대해서만 알립니다. 이렇게 하면 오류 시나리오에서 지속적인 대다수 커밋 변경 사항에 대해서만 알림이 트리거됩니다.
예를 들어 프라이머리에 대해 열린 변경 스트림 커서가 있는 3-노드 복제본 세트가 있다고 가정해 봅시다. 클라이언트가 삽입 연산을 실행하는 경우 변경 스트림은 해당 삽입이 대부분의 데이터 보유 노드에 지속된 경우에만 애플리케이션에 데이터 변경을 알립니다.
작업이 트랜잭션과 연결된 경우, 변경 이벤트 문서에 txnNumber
및 lsid
가 포함됩니다.
데이터 정렬
변경 스트림은 명시적인 데이터 정렬이 제공되지 않는 한 simple
이진 비교를 사용합니다.
Change Streams 및 고아 문서
MongoDB 5.3부터는 범위 마이그레이션 중에 고아 문서에 대한 업데이트에 대한 변경 스트림 이벤트가 생성되지 않습니다.
전후 이미지를 포함하는 문서의 Change Streams
MongoDB 6.0부터는 변경 스트림 이벤트를 사용하여 문서의 변경 전후 버전(문서의 전후 이미지)을 출력할 수 있습니다.
사전 이미지는 문서가 교체, 업데이트 또는 삭제되기 전의 문서입니다. 삽입된 문서에는 사전 이미지가 없습니다.
사후 이미지는 문서가 삽입, 교체, 업데이트된 후의 문서입니다. 삭제된 문서에 대한 사후 이미지가 없습니다.
db.createCollection()
,create
또는collMod
을(를) 사용하는 컬렉션에 대해changeStreamPreAndPostImages
을(를) 활성화하세요.
이미지가 다음과 같은 경우 변경 스트림 이벤트에 사전 및 사후 이미지를 사용할 수 없습니다.
문서 업데이트 또는 삭제 작업 시 collection에서 활성화되지 않았습니다.
expireAfterSeconds
에서 전후 이미지 보존 시간 설정 이후에 제거됩니다.다음 예시에서는 전체 클러스터에서
expireAfterSeconds
를100
초로 설정합니다.use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 다음 예시에서는
expireAfterSeconds
등 현재changeStreamOptions
설정을 반환합니다.db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) expireAfterSeconds
를off
로 설정하면 기본 보존 정책이 사용되며, 해당 변경 스트림 이벤트가 oplog에서 제거될 때까지 사전 및 사후 이미지가 보존됩니다.변경 스트림 이벤트가 oplog에서 제거되면
expireAfterSeconds
사전 및 사후 이미지 보존 시간에 관계없이 해당 사전 및 사후 이미지도 삭제됩니다.
추가 고려 사항
전후 이미지를 활성화하면 저장 공간이 소모되고 처리 시간이 늘어납니다. 필요한 경우에만 전후 이미지를 활성화하세요.
변경 스트림 이벤트 크기를 16메가바이트 미만으로 제한합니다. 이벤트 크기를 제한하려면 다음을 수행하면 됩니다.
문서 크기를 8메가바이트로 제한합니다.
updateDescription
과 같은 다른 변경 스트림 이벤트 필드가 크지 않은 경우 변경 스트림 출력에서 사전 및 사후 이미지를 동시에 요청할 수 있습니다.updateDescription
과 같은 다른 변경 스트림 이벤트 필드가 크지 않은 경우 최대 16메가바이트 문서에 대해 변경 스트림 출력에서 사후 이미지만 요청합니다.다음과 같은 경우 최대 16메가바이트의 문서에 대해 변경 스트림 출력에서 사전 이미지만 요청합니다.
문서 업데이트가 문서 구조나 내용의 작은 부분에만 영향을 미칩니다. 그리고
replace
변경 이벤트를 발생시키지 않습니다.replace
이벤트에는 항상 후이미지가 포함됩니다.
사전 이미지를 요청하려면
db.collection.watch()
에서fullDocumentBeforeChange
를required
또는whenAvailable
로 설정합니다. 사후 이미지를 요청하려면 동일한 방법으로fullDocument
를 설정합니다.사전 이미지가
config.system.preimages
컬렉션에 기록됩니다.config.system.preimages
collection은 커질 수 있습니다. collection 크기를 제한하려면 앞서 표시된 대로 사전 이미지에 대해expireAfterSeconds
시간을 설정할 수 있습니다.사전 이미지는 백그라운드 프로세스가 비동기적으로 제거합니다.
중요
이전 버전과 호환되지 않는 기능
MongoDB 6.0부터는 변경 스트림에 문서 사전 및 사후 이미지를 사용하는 경우 이전 MongoDB 버전으로 다운그레이드하기 전에 collMod
명령을 사용하여 각 collection에 대해 changeStreamPreAndPostImages를 비활성화해야 합니다.
팁
다음도 참조하세요.
변경 스트림 이벤트 및 출력에 대해서는 변경 이벤트를 참조하세요.
컬렉션에서 변경 사항을 확인하려면
db.collection.watch()
를 참조하세요.변경 스트림s 출력에 대한 전체 예시는 전후 이미지를 포함하는 문서의 변경 스트림s를 참조하세요.
변경 스트림s 출력에 대한 전체 예시는 전후 이미지를 포함하는 문서의 변경 스트림s를 참조하세요.