변경 스트림
이 페이지의 내용
변경 스트림을 통해 애플리케이션은 사전에 복잡한 방식 및 수동으로 oplog를 테일링하는 위험 없이 실시간 데이터 변경에 액세스할 수 있습니다. 애플리케이션은 변경 스트림을 사용하여 단일 컬렉션, 데이터베이스 또는 전체 배포의 모든 데이터 변경 사항을 구독하고 이에 즉시 대응할 수 있습니다. 변경 스트림은 집계 프레임워크를 사용하기 때문에 애플리케이션에서 특정 변경 사항을 필터링하거나 알림을 마음대로 변환할 수도 있습니다.
가용성
변경 스트림은 복제본 세트 및 샤딩된 클러스터에 사용할 수 있습니다:
스토리지 엔진
복제본 세트와 샤딩된 클러스터는 WiredTiger 스토리지 엔진을 사용해야 합니다. Change Stream은 MongoDB의 미사용 데이터 암호화 기능을 사용하는 배포에서도 사용할 수 있습니다.
복제본 세트 프로토콜 버전.
복제본 세트와 샤딩된 클러스터는 복제본 세트 프로토콜 버전 1(
pv1
)를 사용해야 합니다.읽기 고려 '대다수' 활성화
MongoDB 4.2부터는
"majority"
읽기 고려 지원 여부와 관계없이 변경 스트림을 사용할 수 있습니다. 읽기 고려majority
지원은 활성화(기본값)하거나 비활성화하여 변경 스트림을 사용할 수 있습니다.MongoDB 4.0 이하 버전에서는
"majority"
읽기 고려 지원을 (기본값으로) 활성화한 경우에만 변경 스트림을 사용할 수 있습니다.
Stable API 지원
변경 스트림은 Stable API V1 에 포함되어 있습니다.
연결
변경 스트림에 대한 연결은 +srv
연결 옵션과 함께 DNS 시드 목록을 사용하거나 연결 문자열에 서버를 개별적으로 나열하여 사용할 수 있습니다.
드라이버가 변경 스트림에 대한 연결을 잃거나 연결이 다운되면 클러스터 내 읽기 설정이 일치하는 다른 노드를 통해 변경 스트림에 대한 연결을 다시 설정하려고 시도합니다. 드라이버가 올바른 읽기 설정을 가진 노드를 찾을 수 없으면 예외가 발생합니다.
자세한 내용은 연결 문자열 URI 형식을 참조하세요.
컬렉션, 데이터베이스 또는 배포 보기
다음에 대한 변경 스트림을 열 수 있습니다.
대상 | 설명 |
---|---|
컬렉션 | 단일 컬렉션( 이 페이지의 예시에서는 MongoDB 변경 스트림 드라이버를 사용하여 단일 컬렉션에 대해 커서를 열고 작업합니다. |
데이터베이스 | 단일 데이터베이스( MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. |
배포 | 배포(복제본 세트 또는 샤딩된 클러스터)에 대한 변경 스트림 커서를 열어 MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. |
참고
변경 스트림 예시
이 페이지의 예시에서는 컬렉션에 대한 변경 스트림 커서를 열고 변경 스트림 커서로 작업하는 방법을 설명하기 위해 MongoDB 드라이버를 사용합니다.
변경 스트림 성능 고려 사항
데이터베이스에 대해 열려 있는 변경 스트림의 양이 연결 풀 크기를 초과할 경우 알림 지연이 발생할 수 있습니다. 각 변경 스트림은 다음 이벤트를 기다리는 시간 동안 변경 스트림에 대한 연결과 GetMore 작업을 사용합니다. 지연 시간 문제를 방지하려면 풀 크기가 열려 있는 변경 스트림 수보다 큰지 확인해야 합니다. 자세한 내용은 MaxPoolSize 설정을 참조하세요.
변경 스트림 열기
변경 스트림을 열려면 다음을 수행합니다.
복제본 세트의 경우 데이터 보유 멤버 중 하나에서 변경 스트림 열기 작업을 실행할 수 있습니다.
샤딩된 클러스터의 경우
mongos
에서 변경 스트림 열기 작업을 실행해야 합니다.
다음 예시는 컬렉션에 대한 변경 스트림을 열고 커서를 반복하여 변경 스트림 문서를 조회합니다. [1]
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
아래의 C 예제에서는 MongoDB 복제본 세트 에 연결하고 컬렉션 이 포함된 데이터베이스 에 액세스했다고 inventory
가정합니다.
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
아래의 C# 예제에서는 MongoDB 복제본 세트 에 연결하고 컬렉션 이 포함된 데이터베이스 에 액세스했다고 inventory
가정합니다.
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
아래 Go 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
아래 Java 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스 했다고 가정합니다.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
아래 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션을 포함한 데이터베이스에 액세스 했다고 가정합니다.
cursor = db.inventory.watch() document = await cursor.next()
아래 Node.js 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
아래 Python 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
cursor = db.inventory.watch() next(cursor)
아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
cursor = inventory.watch.to_enum next_change = cursor.next
아래 Swift (비동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
아래 Swift(동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory
컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
커서에서 데이터 변경 이벤트를 조회하려면 변경 스트림 커서를 반복합니다. 변경 스트림 이벤트에 대한 자세한 내용은 Change Events를 참조하세요.
변경 스트림 커서는 다음 중 하나가 발생할 때까지 열려 있습니다.
커서가 명시적으로 닫힙니다.
무효화 이벤트가 발생합니다. 예를 들어 컬렉션 제거 또는 이름 바꾸기가 있습니다.
MongoDB 배포에 대한 연결이 닫히거나 시간 초과됩니다. 자세한 내용은 Cursor Behaviors를 참조하십시오.
배포가 샤딩된 클러스터인 경우 분할 제거로 인해 열린 변경 스트림 커서가 닫힐 수 있으며 닫힌 변경 스트림 커서가 완전히 재개되지 않을 수 있습니다.
참고
닫히지 않은 커서의 라이프사이클은 언어에 따라 다릅니다.
[1] | startAtOperationTime 을 지정하여 특정 시점에 커서를 열 수 있습니다. 지정된 시작점이 과거인 경우 oplog의 시간 범위 내에 있어야 합니다. |
변경 스트림 출력 수정
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
목록에는 username
이(가) alice
인 모든 작업 또는 operationType
이(가) delete
인 모든 작업을 필터링하는 단일 $match
단계가 포함되어 있습니다.
pipeline
을 watch()
메서드에 전달하면 변경 스트림이 지정된 pipeline
을 통해 알림을 전달한 후 알림을 반환하도록 지시합니다.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
팁
변경 스트림 이벤트 문서의 _id 필드는 재개 토큰 역할을 합니다. 변경 스트림 이벤트의 _id
필드를 수정하거나 제거하기 위해 파이프라인을 사용하지 마세요.
MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.
변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.
업데이트 작업에 대한 전체 문서 조회
기본적으로 변경 스트림은 업데이트 작업 중에 필드의 델타만 반환합니다. 그러나 업데이트된 문서의 가장 최근 과반수 커밋 버전을 반환하도록 변경 스트림을 구성할 수 있습니다.
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
업데이트된 문서의 최신 다수 커밋 버전을 반환하려면 "updateLookup"
값과 함께 "fullDocument"
옵션을 mongoc_collection_watch
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument
필드가 포함되어 있습니다.
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument
필드가 포함되어 있습니다.
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
업데이트된 문서의 과반수 커밋된 최신 버전을 반환하려면 SetFullDocument(options.UpdateLookup)
변경 스트림 옵션을 사용하세요.
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 FullDocument.UPDATE_LOOKUP
을(를) {3} 메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument
필드가 포함되어 있습니다.
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 `full_document
필드가 포함되어 있습니다.
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 { fullDocument: 'updateLookup' }
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument
필드가 포함되어 있습니다.
다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
을 db.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument
필드가 포함되어 있습니다.
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'
을 db.collection.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document
필드가 포함되어 있습니다.
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document: 'updateLookup'
을 db.watch()
메서드에 전달합니다.
아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document
필드가 포함되어 있습니다.
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options:
ChangeStreamOptions(fullDocument: .updateLookup)
을(를) {3} 메서드에 전달합니다.
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options:
ChangeStreamOptions(fullDocument: .updateLookup)
을(를) {3} 메서드에 전달합니다.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
참고
업데이트 작업 후, 조회 전에 업데이트된 문서를 수정한 대다수 커밋 작업이 하나 이상 있는 경우 반환되는 전체 문서가 업데이트 작업 시점의 문서와 크게 다를 수 있습니다.
그러나 변경 스트림 문서에 포함된 델타는 항상 해당 변경 스트림 이벤트에 적용된 감시 대상 컬렉션 변경 사항을 정확하게 설명합니다.
변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.
변경 스트림 다시 시작
변경 스트림은 커서를 열 때 재개 토큰을 resumeAfter 또는 startAfter로 지정하여 재개할 수 있습니다.
resumeAfter
Change Streams의 경우
커서를 여는 시점에 재개(resume) 토큰을 resumeAfter
에 전달하여 특정 이벤트 이후에 변경 스트림을 재개할 수 있습니다.
재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.
중요
타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.
무효화 이벤트(예시: 컬렉션 제거 또는 이름 바꾸기)로 인해 스트림이 닫힌 후에는
resumeAfter
를 사용하여 변경 스트림을 다시 시작할 수 없습니다. 대신 무효화 이벤트 후 startAfter를 사용하여 새 변경 스트림을 시작할 수 있습니다.
아래 예에서는 스트림이 삭제된 후 스트림을 다시 생성하기 위해 스트림 옵션에 resumeAfter
옵션이 추가됩니다. 변경 스트림에 _id
을 전달하면 지정된 작업 이후부터 알림을 재개하려고 시도합니다.
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
아래 예에서 resumeToken
은 마지막 변경 스트림 문서에서 검색되어 옵션으로 Watch()
메서드에 전달됩니다. resumeToken
을 Watch()
메서드에 전달하면 변경 스트림이 재개 토큰에 지정된 작업 이후부터 알림을 다시 시작하도록 시도합니다.
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
ChangeStreamOptions.SetResumeAfter를 사용하여 변경 스트림에 대한 재개 토큰을 지정할 수 있습니다. resumeAfter 옵션이 설정된 경우 변경 스트림은 재개 토큰에 지정된 작업 후에 알림을 다시 시작합니다. SetResumeAfter
는 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예시에서 resumeToken
.
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter()
메서드를 사용할 수 있습니다. resumeAfter()
메서드는 재개 토큰으로 확인되어야 하는 값(예: 아래 예시의 resumeToken
)을 사용합니다.
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after
수정자를 사용할 수 있습니다. resume_after
수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token
을(를) 말합니다.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken
.
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 $resumeToken
.
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after
수정자를 사용할 수 있습니다. resume_after
수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token
을(를) 말합니다.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after
수정자를 사용할 수 있습니다. resume_after
수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token
을(를) 말합니다.
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken
.
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter
옵션을 사용할 수 있습니다. resumeAfter
옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken
.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
Change Streams의 경우
커서를 여는 시점에 재개(resume) 토큰을 startAfter
에 전달하여 특정 이벤트 이후에 새로운 변경 스트림을 시작할 수 있습니다. restartAfter 와 달리 startAfter
는 새로운 변경 스트림을 생성하여 무효화 이벤트 후에 알림을 재개할 수 있습니다.
재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.
중요
타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.
토큰 재개
재개 토큰은 여러 출처에서 사용할 수 있습니다.
소스 | 설명 |
---|---|
각 변경 이벤트 알림은 | |
이 필드는 | |
|
MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.
변경 이벤트에서 토큰 재개
변경 이벤트 알림은 _id
필드에 재개 토큰을 포함합니다.
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
에서 토큰 재개 aggregate
aggregate
명령을 사용하는 경우 $changeStream
집계 단계에서 cursor.postBatchResumeToken
필드에 재개 토큰을 포함합니다:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
에서 토큰 재개 getMore
getMore
명령은 cursor.postBatchResumeToken
필드에 재개 토큰을 포함합니다.
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
사용 사례
변경 스트림은 데이터 변경이 지속되면 다운스트림 시스템에 알려주므로 비즈니스 시스템에 의존하는 아키텍처에 도움이 될 수 있습니다. 예를 들어, 변경 스트림은 개발자가 추출, 변환 및 로드(ETL) 서비스, 플랫폼 간 동기화, 협업 기능 및 알림 서비스를 구현할 때 시간을 절약할 수 있습니다.
액세스 제어
자체 관리 배포서버에서 인증 및 권한 부여를 적용한 배포의 경우 다음이 필요합니다.
특정 컬렉션에 대한 변경 스트림을 열려면 애플리케이션에 해당 컬렉션에 대해
changeStream
및find
조치를 부여할 수 있는 권한이 있어야 합니다.{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 단일 데이터베이스에서 변경 스트림을 열려면 애플리케이션에 데이터베이스의 모든 비
system
컬렉션에 대해changeStream
및find
작업을 수행할 수 있는 권한이 있어야 합니다.{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 전체 배포에서 변경 스트림을 열려면 애플리케이션에 배포의 모든 데이터베이스에 대해
system
이 아닌 모든 collection에 대해changeStream
및find
조치를 허용하는 권한이 있어야 합니다.{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
이벤트 알림
변경 스트림은 복제본 집합에 있는 대부분의 데이터 보유 멤버에 지속된 데이터 변경 내용에 대해서만 알립니다. 이렇게 하면 오류 시나리오에서 지속적인 대다수 커밋 변경 사항에 대해서만 알림이 트리거됩니다.
예를 들어 프라이머리에 대해 열린 변경 스트림 커서가 있는 3-노드 복제본 세트가 있다고 가정해 봅시다. 클라이언트가 삽입 연산을 실행하는 경우 변경 스트림은 해당 삽입이 대부분의 데이터 보유 노드에 지속된 경우에만 애플리케이션에 데이터 변경을 알립니다.
작업이 트랜잭션과 연결된 경우, 변경 이벤트 문서에 txnNumber
및 lsid
가 포함됩니다.
데이터 정렬
변경 스트림은 명시적인 데이터 정렬이 제공되지 않는 한 simple
이진 비교를 사용합니다.