Docs Menu
Docs Home
/
MongoDB 매뉴얼

변경 스트림

이 페이지의 내용

  • 가용성
  • 연결
  • 컬렉션, 데이터베이스 또는 배포 보기
  • 변경 스트림 성능 고려 사항
  • 변경 스트림 열기
  • 변경 스트림 출력 수정
  • 업데이트 작업에 대한 전체 문서 조회
  • 변경 스트림 다시 시작
  • 사용 사례
  • 액세스 제어
  • 이벤트 알림
  • 데이터 정렬
  • Change Streams 및 고아 문서
  • 전후 이미지를 포함하는 문서의 Change Streams

변경 스트림을 통해 애플리케이션은 사전에 복잡한 방식 및 수동으로 oplog를 테일링하는 위험 없이 실시간 데이터 변경에 액세스할 수 있습니다. 애플리케이션은 변경 스트림을 사용하여 단일 컬렉션, 데이터베이스 또는 전체 배포의 모든 데이터 변경 사항을 구독하고 이에 즉시 대응할 수 있습니다. 변경 스트림은 집계 프레임워크를 사용하기 때문에 애플리케이션에서 특정 변경 사항을 필터링하거나 알림을 마음대로 변환할 수도 있습니다.

MongoDB 5.1부터는 변경 스트림이 최적화되어 더 효율적인 리소스 사용률과 일부 집계 파이프라인 단계의 더 빠른 실행을 제공합니다.

변경 스트림은 복제본 세트샤딩된 클러스터에 사용할 수 있습니다:

변경 스트림은 Stable API V1 에 포함되어 있습니다. 그러나 showExpandedEvents 옵션은 Stable API V1 에 포함되어 있지 않습니다.

변경 스트림에 대한 연결은 +srv 연결 옵션과 함께 DNS 시드 목록을 사용하거나 연결 문자열에 서버를 개별적으로 나열하여 사용할 수 있습니다.

드라이버가 변경 스트림에 대한 연결을 잃거나 연결이 다운되면 클러스터 내 읽기 설정이 일치하는 다른 노드를 통해 변경 스트림에 대한 연결을 다시 설정하려고 시도합니다. 드라이버가 올바른 읽기 설정을 가진 노드를 찾을 수 없으면 예외가 발생합니다.

자세한 내용은 연결 문자열 URI 형식을 참조하세요.

다음에 대한 변경 스트림을 열 수 있습니다.

대상
설명
컬렉션

단일 컬렉션(system 컬렉션 또는 admin, localconfig 데이터베이스의 모든 컬렉션 제외)에 대해 변경 스트림 커서를 열 수 있습니다.

이 페이지의 예시에서는 MongoDB 변경 스트림 드라이버를 사용하여 단일 컬렉션에 대해 커서를 열고 작업합니다. mongosh 메서드 db.collection.watch()를 참조하세요.

데이터베이스

단일 데이터베이스( admin, localconfig 데이터베이스 제외)에 대한 변경 스트림 커서를 열어 모든 비시스템 컬렉션에 대한 변경 사항을 관찰할 수 있습니다.

MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. mongosh 메서드 db.watch()도 참조하세요.

배포

배포(복제본 세트 또는 샤딩된 클러스터)에 대한 변경 스트림 커서를 열어 admin, localconfig를 제외한 모든 데이터베이스 전반에서 모든 비시스템 컬렉션에 대한 변경 사항을 확인할 수 있습니다.

MongoDB 드라이버 메서드에 대해서는 드라이버 설명서를 참조하세요. mongosh 메서드 Mongo.watch()도 참조하세요.

참고

변경 스트림 예시

이 페이지의 예시에서는 컬렉션에 대한 변경 스트림 커서를 열고 변경 스트림 커서로 작업하는 방법을 설명하기 위해 MongoDB 드라이버를 사용합니다.

데이터베이스에 대해 열려 있는 변경 스트림의 양이 연결 풀 크기를 초과할 경우 알림 지연이 발생할 수 있습니다. 각 변경 스트림은 다음 이벤트를 기다리는 시간 동안 변경 스트림에 대한 연결과 GetMore 작업을 사용합니다. 지연 시간 문제를 방지하려면 풀 크기가 열려 있는 변경 스트림 수보다 큰지 확인해야 합니다. 자세한 내용은 MaxPoolSize 설정을 참조하세요.

샤딩된 클러스터에서 변경 스트림이 열리는 경우:

  • mongos각 샤드에서 개별적인 변경 스트림을 생성합니다. 이 동작은 변경 스트림이 특정 샤드 키 범위를 대상으로 하는지 여부에 관계없이 발생합니다.

  • mongos에서 변경 스트림 결과를 수신하면 해당 결과가 정렬 및 필터링됩니다. 필요한 경우 mongos에서 fullDocument 조회도 수행합니다.

최상의 성능을 얻으려면 변경 스트림에서 $lookup 쿼리 사용을 제한하세요.

변경 스트림을 열려면 다음을 수행합니다.

  • 복제본 세트의 경우 데이터 보유 멤버 중 하나에서 변경 스트림 열기 작업을 실행할 수 있습니다.

  • 샤딩된 클러스터의 경우 mongos에서 변경 스트림 열기 작업을 실행해야 합니다.

다음 예시는 컬렉션에 대한 변경 스트림을 열고 커서를 반복하여 변경 스트림 문서를 조회합니다. [1]


➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.


아래 C 예제는 MongoDB 복제본 세트에 연결하고 inventory 컬렉션을 포함한 데이터베이스에 액세스했다고 가정합니다.

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

아래 C# 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

아래 Go 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

아래 Java 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스 했다고 가정합니다.

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

아래의 코틀린 (Kotlin) 예제에서는 MongoDB 복제본 세트 에 연결되어 있고 inventory 컬렉션 이 포함된 데이터베이스 에 액세스 할 수 있다고 가정합니다. 이러한 작업을 완료하는 방법에 학습 보려면 코틀린 (Kotlin) 루틴 드라이버 데이터베이스 및 컬렉션 가이드 를 참조하세요.

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

아래 예시에서는 MongoDB 복제본 세트에 연결하고 inventory 컬렉션을 포함한 데이터베이스에 액세스 했다고 가정합니다.

cursor = db.inventory.watch()
document = await cursor.next()

아래 Node.js 예시에서는 MongoDB 복제본 세트에 연결하고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream은 EventEmitter를 확장합니다.

아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

아래 Python 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

cursor = db.inventory.watch()
next(cursor)

아래 예시에서는 MongoDB 복제본 세트에 연결했고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

cursor = inventory.watch.to_enum
next_change = cursor.next

아래 Swift (비동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

아래 Swift(동기) 예시에서는 MongoDB 복제본 세트에 연결하고 inventory 컬렉션이 포함된 데이터베이스에 액세스했다고 가정합니다.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

커서에서 데이터 변경 이벤트를 조회하려면 변경 스트림 커서를 반복합니다. 변경 스트림 이벤트에 대한 자세한 내용은 Change Events를 참조하세요.

변경 스트림 커서는 다음 중 하나가 발생할 때까지 열려 있습니다.

  • 커서가 명시적으로 닫힙니다.

  • 무효화 이벤트가 발생합니다. 예를 들어 컬렉션 제거 또는 이름 바꾸기가 있습니다.

  • MongoDB 배포에 대한 연결이 닫히거나 시간 초과됩니다. 자세한 내용은 Cursor Behaviors를 참조하십시오.

  • 배포가 샤딩된 클러스터인 경우 샤드를 제거하면 열려 있는 변경 스트림 커서가 닫힐 수 있습니다. 닫힌 변경 스트림 커서는 완전히 재개되지 않을 수 있습니다.

참고

닫히지 않은 커서의 라이프사이클은 언어에 따라 다릅니다.

[1] startAtOperationTime을 지정하여 특정 시점에 커서를 열 수 있습니다. 지정된 시작점이 과거인 경우 oplog의 시간 범위 내에 있어야 합니다.

➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.


변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipeline 목록에는 다음 기준 중 하나 또는 둘 다를 충족하는 작업을 필터링하는 단일 $match 단계가 포함되어 있습니다.

  • username 값은 다음과 같습니다. alice

  • operationType 값은 다음과 같습니다. delete

pipelinewatch() 메서드에 전달하면 변경 스트림이 지정된 pipeline을 통해 알림을 전달한 후 알림을 반환하도록 지시합니다.

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

pipeline 목록에는 다음 기준 중 하나 또는 둘 다를 충족하는 작업을 필터링하는 단일 $match 단계가 포함되어 있습니다.

  • username 값은 다음과 같습니다. alice

  • operationType 값은 다음과 같습니다. delete

pipelinewatch() 메서드에 전달하면 변경 스트림이 지정된 pipeline을 통해 알림을 전달한 후 알림을 반환하도록 지시합니다.

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

변경 스트림을 구성할 때 다음 파이프라인 단계 중 하나 이상의 배열을 제공하여 변경 스트림 출력을 제어할 수 있습니다.

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

변경 스트림 이벤트 문서의 _id 필드는 재개 토큰 역할을 합니다. 변경 스트림 이벤트의 _id 필드를 수정하거나 제거하기 위해 파이프라인을 사용하지 마세요.

MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.

변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.

기본적으로 변경 스트림은 업데이트 작업 중에 필드의 델타만 반환합니다. 그러나 업데이트된 문서의 가장 최근 과반수 커밋 버전을 반환하도록 변경 스트림을 구성할 수 있습니다.


➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.


업데이트된 문서의 최신 다수 커밋 버전을 반환하려면 "updateLookup" 값과 함께 "fullDocument" 옵션을 mongoc_collection_watch 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument 필드가 포함되어 있습니다.

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument 필드가 포함되어 있습니다.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

업데이트된 문서의 과반수 커밋된 최신 버전을 반환하려면 SetFullDocument(options.UpdateLookup) 변경 스트림 옵션을 사용하세요.

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 FullDocument.UPDATE_LOOKUP을(를) {3} 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument 필드가 포함되어 있습니다.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

업데이트된 문서 의 가장 최근 다수 FullDocument.UPDATE_LOOKUP 커밋 버전을 반환하려면 을 ChangeStreamFlow.fullDocument() 메서드.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 FullDocument 필드가 포함되어 있습니다.

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 `full_document 필드가 포함되어 있습니다.

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 { fullDocument: 'updateLookup' }db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument 필드가 포함되어 있습니다.

다음 예는 스트림을 사용하여 변경 이벤트를 프로세스하는 예시입니다.

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

또는 반복기를 사용하여 변경 이벤트를 프로세스할 수도 있습니다.

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"db.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 fullDocument 필드가 포함되어 있습니다.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document='updateLookup'db.collection.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document 필드가 포함되어 있습니다.

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 full_document: 'updateLookup'db.watch() 메서드에 전달합니다.

아래 예에서 모든 업데이트 작업 알림에는 업데이트 작업의 영향을 받는 문서의 현재 버전을 나타내는 full_document 필드가 포함되어 있습니다.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options: ChangeStreamOptions(fullDocument: .updateLookup)을(를) {3} 메서드에 전달합니다.

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

업데이트된 문서의 가장 최근에 커밋된 버전을 반환하려면 options: ChangeStreamOptions(fullDocument: .updateLookup)을(를) {3} 메서드에 전달합니다.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

참고

업데이트 작업 , 조회 전에 업데이트된 문서를 수정한 대다수 커밋 작업이 하나 이상 있는 경우 반환되는 전체 문서가 업데이트 작업 시점의 문서와 크게 다를 수 있습니다.

그러나 변경 스트림 문서에 포함된 델타는 항상 해당 변경 스트림 이벤트에 적용된 감시 대상 컬렉션 변경 사항을 정확하게 설명합니다.

다음 중 하나에 해당하는 경우 업데이트 이벤트의 fullDocument 필드가 누락될 수 있습니다.

  • 문서가 삭제되거나 업데이트와 조회 사이에 컬렉션이 삭제된 경우.

  • 업데이트로 인해 해당 컬렉션의 샤드 키에 있는 필드 중 하나 이상의 값이 변경되는 경우.

변경 스트림 응답 문서 형식에 대한 자세한 내용은 Change Events를 참조하세요.

변경 스트림은 커서를 열 때 재개 토큰을 resumeAfter 또는 startAfter로 지정하여 재개할 수 있습니다.

커서를 여는 시점에 재개(resume) 토큰을 resumeAfter에 전달하여 특정 이벤트 이후에 변경 스트림을 재개할 수 있습니다.

재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.

중요

  • 타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.

  • 무효화 이벤트(예시: 컬렉션 제거 또는 이름 바꾸기)로 인해 스트림이 닫힌 후에는 resumeAfter를 사용하여 변경 스트림을 다시 시작할 수 없습니다. 대신 무효화 이벤트startAfter를 사용하여 새 변경 스트림을 시작할 수 있습니다.

아래 예에서는 스트림이 삭제된 후 스트림을 다시 생성하기 위해 스트림 옵션에 resumeAfter 옵션이 추가됩니다. 변경 스트림에 _id을 전달하면 지정된 작업 이후부터 알림을 재개하려고 시도합니다.

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

아래 예에서 resumeToken은 마지막 변경 스트림 문서에서 검색되어 옵션으로 Watch() 메서드에 전달됩니다. resumeTokenWatch() 메서드에 전달하면 변경 스트림이 재개 토큰에 지정된 작업 이후부터 알림을 다시 시작하도록 시도합니다.

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

ChangeStreamOptions.SetResumeAfter를 사용하여 변경 스트림에 대한 재개 토큰을 지정할 수 있습니다. resumeAfter 옵션이 설정된 경우 변경 스트림은 재개 토큰에 지정된 작업 후에 알림을 다시 시작합니다. SetResumeAfter는 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예시에서 resumeToken.

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter() 메서드를 사용할 수 있습니다. resumeAfter() 메서드는 재개 토큰으로 확인되어야 하는 값(예: 아래 예시의 resumeToken)을 사용합니다.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

ChangeStreamFlow.resumeAfter() 메서드를 사용하여 재개 토큰에 지정된 작업 후 알림 을 다시 시작합니다. resumeAfter() 메서드는 아래 예시 resumeToken 변수와 같이 재개 토큰으로 해석되어야 하는 값을 사용합니다.

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after 수정자를 사용할 수 있습니다. resume_after 수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token을(를) 말합니다.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken.

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 $resumeToken.

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after 수정자를 사용할 수 있습니다. resume_after 수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token을(를) 말합니다.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resume_after 수정자를 사용할 수 있습니다. resume_after 수정자는 재개 토큰으로 확인해야 하는 값을 사용합니다. 예를 들어 아래 예시의 resume_token을(를) 말합니다.

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken.

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

재개 토큰에 지정된 작업 후에 알림을 재개하려면 resumeAfter 옵션을 사용할 수 있습니다. resumeAfter 옵션은 재개 토큰으로 확인되어야 하는 값을 취합니다. 예시: 아래 예에서 resumeToken.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

커서를 여는 시점에 재개(resume) 토큰을 startAfter에 전달하여 특정 이벤트 이후에 새로운 변경 스트림을 시작할 수 있습니다. restartAfter 와 달리 startAfter는 새로운 변경 스트림을 생성하여 무효화 이벤트 후에 알림을 재개할 수 있습니다.

재개 토큰에 대한 자세한 내용은 재개 토큰을 참조하세요.

중요

  • 타임스탬프가 과거에 있었던 경우, oplog에 토큰 또는 타임스탬프와 관련된 작업을 찾을 수 있는 충분한 기록이 있어야 합니다.

재개 토큰은 여러 출처에서 사용할 수 있습니다.

소스
설명
각 변경 이벤트 알림은 _id 필드에 재개 토큰을 포함합니다.

$changeStream 집계 단계에는 cursor.postBatchResumeToken 필드에 재개 토큰이 포함됩니다.

이 필드는 aggregate 명령을 사용할 때만 나타납니다.

getMore 명령은 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다.

MongoDB 4.2부터는 변경 스트림 집계 파이프라인이 이벤트의 _id 필드를 수정하는 경우 변경 스트림이 예외를 발생시킵니다.

MongoDB는 mongosh 확장인 "스니펫"을 제공하여 16진수로 인코딩된 재개 토큰을 디코딩합니다.

에서 resumetoken 스니펫을 설치하고 실행할 수 mongosh 있습니다.

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

시스템에 npm(이)가 설치되어 있는 경우 명령줄에서 (mongosh(을)를 사용하지 않고)resumetoken을 실행할 수도 있습니다.

npx mongodb-resumetoken-decoder <RESUME TOKEN>

자세한 내용은 다음을 참조하세요.

변경 이벤트 알림은 _id 필드에 재개 토큰을 포함합니다.

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

aggregate 명령을 사용하는 경우 $changeStream 집계 단계에서 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 명령은 cursor.postBatchResumeToken 필드에 재개 토큰을 포함합니다.

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

변경 스트림은 데이터 변경이 지속되면 다운스트림 시스템에 알려주므로 비즈니스 시스템에 의존하는 아키텍처에 도움이 될 수 있습니다. 예를 들어, 변경 스트림은 개발자가 추출, 변환 및 로드(ETL) 서비스, 플랫폼 간 동기화, 협업 기능 및 알림 서비스를 구현할 때 시간을 절약할 수 있습니다.

자체 관리 배포서버에서 인증권한 부여를 적용한 배포의 경우 다음이 필요합니다.

  • 특정 컬렉션에 대한 변경 스트림을 열려면 애플리케이션에 해당 컬렉션에 대해 changeStreamfind 조치를 부여할 수 있는 권한이 있어야 합니다.

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 단일 데이터베이스에서 변경 스트림을 열려면 애플리케이션에 데이터베이스의 모든 비system 컬렉션에 대해 changeStreamfind작업을 수행할 수 있는 권한이 있어야 합니다.

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 전체 배포에서 변경 스트림을 열려면 애플리케이션에 배포의 모든 데이터베이스에 대해system 이 아닌 모든 collection에 대해 changeStreamfind 조치를 허용하는 권한이 있어야 합니다.

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

변경 스트림은 복제본 집합에 있는 대부분의 데이터 보유 멤버에 지속된 데이터 변경 내용에 대해서만 알립니다. 이렇게 하면 오류 시나리오에서 지속적인 대다수 커밋 변경 사항에 대해서만 알림이 트리거됩니다.

예를 들어 프라이머리에 대해 열린 변경 스트림 커서가 있는 3-노드 복제본 세트가 있다고 가정해 봅시다. 클라이언트가 삽입 연산을 실행하는 경우 변경 스트림은 해당 삽입이 대부분의 데이터 보유 노드에 지속된 경우에만 애플리케이션에 데이터 변경을 알립니다.

작업이 트랜잭션과 연결된 경우, 변경 이벤트 문서에 txnNumberlsid가 포함됩니다.

변경 스트림은 명시적인 데이터 정렬이 제공되지 않는 한 simple 이진 비교를 사용합니다.

MongoDB 5.3부터는 범위 마이그레이션 중에 고아 문서에 대한 업데이트에 대한 변경 스트림 이벤트가 생성되지 않습니다.

MongoDB 6.0부터는 변경 스트림 이벤트를 사용하여 문서의 변경 전후 버전(문서의 전후 이미지)을 출력할 수 있습니다.

  • 사전 이미지는 문서가 교체, 업데이트 또는 삭제되기 전의 문서입니다. 삽입된 문서에는 사전 이미지가 없습니다.

  • 사후 이미지는 문서가 삽입, 교체, 업데이트된 후의 문서입니다. 삭제된 문서에 대한 사후 이미지가 없습니다.

  • db.createCollection(), create 또는 collMod을(를) 사용하는 컬렉션에 대해 changeStreamPreAndPostImages을(를) 활성화하세요.

이미지가 다음과 같은 경우 변경 스트림 이벤트에 사전 및 사후 이미지를 사용할 수 없습니다.

  • 문서 업데이트 또는 삭제 작업 시 collection에서 활성화되지 않았습니다.

  • expireAfterSeconds에서 전후 이미지 보존 시간 설정 이후에 제거됩니다.

    • 다음 예시에서는 전체 클러스터에서 expireAfterSeconds100초로 설정합니다.

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • 다음 예시에서는 expireAfterSeconds 등 현재 changeStreamOptions 설정을 반환합니다.

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSecondsoff로 설정하면 기본 보존 정책이 사용되며, 해당 변경 스트림 이벤트가 oplog에서 제거될 때까지 사전 및 사후 이미지가 보존됩니다.

    • 변경 스트림 이벤트가 oplog에서 제거되면 expireAfterSeconds 사전 및 사후 이미지 보존 시간에 관계없이 해당 사전 및 사후 이미지도 삭제됩니다.

추가 고려 사항

  • 전후 이미지를 활성화하면 저장 공간이 소모되고 처리 시간이 늘어납니다. 필요한 경우에만 전후 이미지를 활성화하세요.

  • 변경 스트림 이벤트 크기를 16메가바이트 미만으로 제한합니다. 이벤트 크기를 제한하려면 다음을 수행하면 됩니다.

    • 문서 크기를 8메가바이트로 제한합니다. updateDescription과 같은 다른 변경 스트림 이벤트 필드가 크지 않은 경우 변경 스트림 출력에서 사전 및 사후 이미지를 동시에 요청할 수 있습니다.

    • updateDescription과 같은 다른 변경 스트림 이벤트 필드가 크지 않은 경우 최대 16메가바이트 문서에 대해 변경 스트림 출력에서 사후 이미지만 요청합니다.

    • 다음과 같은 경우 최대 16메가바이트의 문서에 대해 변경 스트림 출력에서 사전 이미지만 요청합니다.

      • 문서 업데이트가 문서 구조나 내용의 작은 부분에만 영향을 미칩니다. 그리고

      • replace 변경 이벤트를 발생시키지 않습니다. replace 이벤트에는 항상 후이미지가 포함됩니다.

  • 사전 이미지를 요청하려면 db.collection.watch()에서 fullDocumentBeforeChangerequired 또는 whenAvailable로 설정합니다. 사후 이미지를 요청하려면 동일한 방법으로 fullDocument를 설정합니다.

  • 사전 이미지가 config.system.preimages 컬렉션에 기록됩니다.

    • config.system.preimages collection은 커질 수 있습니다. collection 크기를 제한하려면 앞서 표시된 대로 사전 이미지에 대해 expireAfterSeconds 시간을 설정할 수 있습니다.

    • 사전 이미지는 백그라운드 프로세스가 비동기적으로 제거합니다.

중요

이전 버전과 호환되지 않는 기능

MongoDB 6.0부터는 변경 스트림에 문서 사전 및 사후 이미지를 사용하는 경우 이전 MongoDB 버전으로 다운그레이드하기 전에 collMod 명령을 사용하여 각 collection에 대해 changeStreamPreAndPostImages를 비활성화해야 합니다.

다음도 참조하세요.

변경 스트림s 출력에 대한 전체 예시는 전후 이미지를 포함하는 문서의 변경 스트림s를 참조하세요.

돌아가기

제한 사항