Docs Menu
Docs Home
/
MongoDB 매뉴얼

변경 스트림

이 페이지의 내용

  • 가용성
  • 연결
  • 컬렉션, 데이터베이스 또는 배포 보기
  • Change Stream 성능 고려 사항
  • Change Stream 열기
  • Change Stream 출력 수정
  • 업데이트 작업에 대한 전체 문서 조회
  • 변경 스트림 다시 시작
  • 사용 사례
  • 액세스 제어
  • 이벤트 알림
  • 데이터 정렬

변경 스트림을 사용하면 애플리케이션이 oplog 를 수동으로 테일링해야 하는 복잡성과 위험 없이 실시간 데이터 변경 사항에 액세스할 수 있습니다. 애플리케이션은 변경 스트림을 사용하여 단일 컬렉션, 데이터베이스 또는 전체 배포의 모든 데이터 변경 사항을 구독하고 즉시 대응할 수 있습니다. 변경 스트림은 애그리게이션 프레임워크를 사용하기 때문에 애플리케이션에서 특정 변경 사항을 필터링하거나 알림을 마음대로 변환할 수도 있습니다.

변경 스트림은 복제본 세트샤드 클러스터에 사용할 수 있습니다:

변경 스트림은 Stable API V1 에 포함되어 있습니다.

변경 스트림에 대한 연결은 +srv 연결 옵션과 함께 DNS 시드 목록을 사용하거나 연결 문자열에 서버를 개별적으로 나열하여 사용할 수 있습니다.

드라이버가 변경 스트림에 대한 연결을 잃거나 연결이 다운되면 클러스터 내 읽기 설정이 일치하는 다른 노드를 통해 변경 스트림에 대한 연결을 다시 설정하려고 시도합니다. 드라이버가 올바른 읽기 설정을 가진 노드를 찾을 수 없으면 예외가 발생합니다.

자세한 내용은 연결 문자열 URI 형식을 참조하세요.

다음에 대한 change stream을 열 수 있습니다.

대상
설명
컬렉션

단일 컬렉션(system 컬렉션 또는 admin, localconfig 데이터베이스의 모든 컬렉션 제외)에 대해 변경 스트림 커서를 열 수 있습니다.

이 페이지의 예시에서는 MongoDB 변경 스트림 드라이버를 사용하여 단일 컬렉션에 대해 커서를 열고 작업합니다. mongosh 메서드 db.collection.watch()도 참조하세요.

데이터베이스

단일 데이터베이스( admin, localconfig 데이터베이스 제외)에 대한 변경 스트림 커서를 열어 모든 비시스템 컬렉션에 대한 변경 사항을 감시할 수 있습니다.

MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. mongosh 메서드 db.watch()도 참조하세요.

배포

배포(복제본 세트 또는 샤드 클러스터)에 대한 변경 스트림 커서를 열어 admin, localconfig 를 제외한 모든 데이터베이스에서 모든 비시스템 컬렉션에 대한 변경 사항을 감시할 수 있습니다.

MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. mongosh 메서드 Mongo.watch()도 참조하세요.

참고

Change Stream 예시

이 페이지의 예제에서는 컬렉션에 대한 변경 스트림 커서를 열고 변경 스트림 커서로 작업하는 방법을 설명하기 위해 MongoDB 드라이버를 사용합니다.

데이터베이스에 대해 열려 있는 변경 스트림의 양이 연결 풀 크기를 초과할 경우 알림 지연이 발생할 수 있습니다. 각 변경 스트림은 다음 이벤트를 기다리는 시간 동안 변경 스트림에 대한 연결과 GetMore 작업을 사용합니다. 지연 시간 문제를 방지하려면 풀 크기가 열려 있는 변경 스트림 수보다 큰지 확인해야 합니다. 자세한 내용은 MaxPoolSize 설정을 참조하세요.

변경 스트림을 열려면 다음을 수행합니다.

  • 복제본 세트의 경우 데이터 보유 멤버 중 하나에서 변경 스트림 열기 작업을 실행할 수 있습니다.

  • 샤드 클러스터의 경우 mongos에서 변경 스트림 열기 작업을 실행해야 합니다.

다음 예시는 컬렉션에 대한 변경 스트림을 열고 커서를 반복하여 변경 스트림 문서를 조회합니다. [1]


➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예제의 언어를 설정하세요.


아래의 C 예제에서는 MongoDB 복제본 세트에 연결되어 있고 데이터베이스 에 액세스 inventory 했다고 가정합니다. 컬렉션을 포함합니다.

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

아래 C# 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

아래의 Go 예제에서는 MongoDB 복제본 세트에 연결되어 있고 데이터베이스에 액세스 inventory 했다고 가정합니다. 컬렉션을 포함합니다.

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

아래의 Java 예제에서는 MongoDB 복제본 세트에 연결되어 있고 데이터베이스에 액세스 inventory 했다고 가정합니다. 컬렉션을 포함합니다.

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

아래 예제에서는 MongoDB 복제본 세트에 연결하고 데이터베이스에 액세스 inventory 했다고 가정합니다. 컬렉션을 포함합니다.

cursor = db.inventory.watch()
document = await cursor.next()

아래의 Node.js 예제에서는 MongoDB 복제본 세트에 연결되어 있고 데이터베이스에 액세스 inventory 했다고 가정합니다. 컬렉션을 포함합니다.

다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예제입니다.

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

아래의 Python 예제에서는 MongoDB 복제본 세트에 연결 하고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

cursor = db.inventory.watch()
next(cursor)

아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

cursor = inventory.watch.to_enum
next_change = cursor.next

아래 Swift (비동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

아래의 Swift(동기화) 예제에서는 MongoDB 복제본 세트에 연결하고 데이터베이스에 액세스 inventory 했다고 가정합니다. 컬렉션을 포함합니다.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

커서에서 데이터 변경 이벤트를 조회하려면 변경 스트림 커서를 반복합니다. 변경 스트림 이벤트에 대한 자세한 내용은 Change Events를 참조하세요.

변경 스트림 커서는 다음 중 하나가 발생할 때까지 열려 있습니다.

  • 커서가 명시적으로 닫힙니다.

  • 무효화 이벤트가 발생합니다. 예를 들어 컬렉션 삭제 또는 이름 바꾸기가 있습니다.

  • MongoDB 배포에 대한 연결이 닫히거나 시간 초과됩니다. 자세한 내용은 Cursor Behaviors를 참조하십시오.

  • 배포가 분할된 클러스터인 경우 분할 제거로 인해 열린 변경 스트림 커서가 닫힐 수 있으며 닫힌 변경 스트림 커서가 완전히 재개되지 않을 수 있습니다.

참고

닫히지 않은 커서의 라이프사이클은 언어에 따라 다릅니다.

[1] startAtOperationTime 를 지정하여 특정 시점에 커서를 열 수 있습니다. 지정된 시작점이 과거인 경우 oplog의 시간 범위 내에 있어야 합니다.

➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예제의 언어를 설정하세요.


변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipeline 목록에는 username이(가) alice인 모든 작업 또는 operationType이(가) delete인 모든 작업을 필터링하는 단일 $match 단계가 포함되어 있습니다.

pipelinewatch() 메서드에 전달하면 변경 스트림이 지정된 pipeline을 통해 알림을 전달한 후 알림을 반환하도록 지시합니다.

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예제입니다.

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

변경 스트림 이벤트 문서의 _id 필드는 재개 토큰 역할을 합니다. 변경 스트림 이벤트의 _id 필드를 수정하거나 제거하기 위해 파이프라인을 사용하지 마세요.

MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.

변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.

기본적으로 변경 스트림은 업데이트 작업 중에 필드의 델타만 반환합니다. 그러나 업데이트된 문서의 가장 최근 과반수 커밋 버전을 반환하도록 변경 스트림을 구성할 수 있습니다.


➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예제의 언어를 설정하세요.


업데이트된 문서의 최신 다수 커밋 버전을 반환하려면 "updateLookup" 값과 함께 "fullDocument" 옵션을 mongoc_collection_watch 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument 필드가 포함되어 있습니다.

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument 필드가 포함되어 있습니다.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

업데이트된 문서의 과반수 커밋된 최신 버전을 반환하려면 SetFullDocument(options.UpdateLookup) 변경 스트림 옵션을 사용하세요.

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 FullDocument.UPDATE_LOOKUP을(를) {3} 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument 필드가 포함되어 있습니다.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 `full_document 필드가 포함되어 있습니다.

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 { fullDocument: 'updateLookup' }db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument 필드가 포함되어 있습니다.

다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예제입니다.

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"db.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument 필드가 포함되어 있습니다.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document 필드가 포함되어 있습니다.

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document: 'updateLookup'db.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document 필드가 포함되어 있습니다.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options: ChangeStreamOptions(fullDocument: .updateLookup)을(를) {3} 메서드에 전달합니다.

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options: ChangeStreamOptions(fullDocument: .updateLookup)을(를) {3} 메서드에 전달합니다.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

참고

업데이트 작업 , 조회 전에 업데이트된 문서를 수정한 대다수 커밋 작업이 하나 이상 있는 경우 반환되는 전체 문서가 업데이트 작업 시점의 문서와 크게 다를 수 있습니다.

그러나 변경 스트림 문서에 포함된 델타는 항상 해당 변경 스트림 이벤트에 적용된 감시 대상 컬렉션 변경 사항을 정확하게 설명합니다.

변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.

변경 스트림은 커서를 열 때 재개 토큰을 resumeAfter 또는 startAfter로 지정하여 재개할 수 있습니다.

커서를 여는 시점에 재개(resume) 토큰을 resumeAfter에 전달하여 특정 이벤트 이후에 변경 스트림을 재개할 수 있습니다.

재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.

중요

  • 타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.

  • 무효화 이벤트(예시: 컬렉션 제거 또는 이름 바꾸기)로 인해 스트림이 닫힌 후에는 resumeAfter를 사용하여 변경 스트림을 다시 시작할 수 없습니다. 대신 무효화 이벤트startAfter를 사용하여 새 변경 스트림을 시작할 수 있습니다.

아래 예에서는 스트림이 삭제된 후 스트림을 다시 생성하기 위해 스트림 옵션에 resumeAfter 옵션이 추가됩니다. 변경 스트림에 _id을 전달하면 지정된 작업 이후부터 알림을 재개하려고 시도합니다.

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

아래 예에서는 마지막 변경 스트림 문서에서 resumeToken 를 검색하여 Watch() 메서드에 옵션으로 전달합니다. resumeTokenWatch() 메서드에 전달하면 변경 스트림이 재개 토큰에 지정된 작업 이후부터 알림을 재개하려고 시도하도록 지시합니다.

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

ChangeStreamOptions.SetResumeAfter 를 사용할 수 있습니다. 변경 스트림에 대한 재개 토큰을 지정합니다. resumeAfter 옵션이 설정되면 변경 스트림은 재개 토큰에 지정된 작업 후 알림을 다시 시작합니다. 는 SetResumeAfter 재개 토큰으로 해석되어야 하는 값을resumeToken 사용합니다(예: 아래 예에서 ).

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter() 메서드를 사용할 수 있습니다. resumeAfter() 메서드는 재개 토큰으로 확인되어야 하는 값(예: 아래 예시의 resumeToken)을 사용합니다.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after 수정자를 사용할 수 있습니다. resume_after 수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token을(를) 말합니다.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예: 아래 예에서 resumeToken.

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예: 아래 예에서 $resumeToken.

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after 수정자를 사용할 수 있습니다. resume_after 수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token을(를) 말합니다.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after 수정자를 사용할 수 있습니다. resume_after 수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token을(를) 말합니다.

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예: 아래 예에서 resumeToken.

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예: 아래 예에서 resumeToken.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

커서를 여는 시점에 재개(resume) 토큰을 startAfter에 전달하여 특정 이벤트 이후에 새로운 변경 스트림을 시작할 수 있습니다. restartAfter 와 달리 startAfter는 새로운 변경 스트림을 생성하여 무효화 이벤트 후에 알림을 재개할 수 있습니다.

재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.

중요

  • 타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.

재개 토큰은 여러 출처에서 사용할 수 있습니다.

소스
설명
각 변경 이벤트 알림은 _id 필드에 재개 토큰을 포함합니다.

$changeStream 애그리게이션 단계에는 cursor.postBatchResumeToken 필드에 재개 토큰이 포함됩니다.

이 필드는 aggregate 명령을 사용할 때만 나타납니다.

getMore 명령은 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다.

MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.

변경 이벤트 알림은 _id 필드에 재개 토큰을 포함합니다.

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

aggregate 명령을 사용하는 경우 $changeStream 애그리게이션 단계에서 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 명령은 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다.

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

변경 스트림은 데이터 변경이 지속되면 다운스트림 시스템에 알려주므로 비즈니스 시스템에 의존하는 아키텍처에 도움이 될 수 있습니다. 예를 들어, 변경 스트림은 개발자가 추출, 변환 및 로드(ETL) 서비스, 플랫폼 간 동기화, 협업 기능 및 알림 서비스를 구현할 때 시간을 절약할 수 있습니다.

자체 관리 배포서버에 인증권한 부여를 적용하는 배포의 경우:

  • 특정 컬렉션에 대한 변경 스트림을 열려면 애플리케이션에 해당 컬렉션에 대해 changeStreamfind 조치를 부여할 수 있는 권한이 있어야 합니다.

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 단일 데이터베이스에서 변경 스트림을 열려면 애플리케이션에 데이터베이스의 모든 비system 컬렉션에 대해 changeStreamfind작업을 수행할 수 있는 권한이 있어야 합니다.

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 전체 배포에서 change stream을 열려면 애플리케이션에 배포의 모든 데이터베이스에 대해system 이 아닌 모든 collection에 대해 changeStreamfind 조치를 허용하는 권한이 있어야 합니다.

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

변경 스트림은 복제본 집합에 있는 대부분의 데이터 보유 멤버에 지속된 데이터 변경 내용에 대해서만 알립니다. 이렇게 하면 오류 시나리오에서 지속적인 대다수 커밋 변경 사항에 대해서만 알림이 트리거됩니다.

예를 들어 프라이머리에 대해 열린 change stream 커서가 있는 3-노드 복제본 세트가 있다고 가정해 봅시다. 클라이언트가 삽입 연산을 실행하는 경우 change stream은 해당 삽입이 대부분의 데이터 보유 노드에 지속된 경우에만 애플리케이션에 데이터 변경을 알립니다.

작업이 트랜잭션과 연결된 경우, 변경 이벤트 문서에 txnNumberlsid가 포함됩니다.

변경 스트림은 명시적인 데이터 정렬이 제공되지 않는 한 simple 이진 비교를 사용합니다.

돌아가기

시계열