문서 메뉴
문서 홈
/ / /
C#/.NET
/ / /

데이터 변경 사항 모니터링

이 페이지의 내용

  • 개요
  • 샘플 데이터
  • Change Stream 열기
  • Change Stream 출력 수정
  • Watch() 동작 수정
  • 사전 이미지 및 사후 이미지 포함하기
  • 추가 정보
  • API 문서

이 가이드에서는 변경 스트림 을 사용하여 데이터의 실시간 변경 사항을 모니터링하는 방법을 배울 수 있습니다. 변경 스트림은 애플리케이션이 컬렉션, 데이터베이스 또는 배포의 데이터 변경 사항을 구독할 수 있도록 하는 MongoDB Server 기능입니다.

이 가이드의 예제에서는 Atlas 샘플 데이터 세트sample_restaurants.restaurants 컬렉션을 사용합니다. 무료 MongoDB Atlas 클러스터를 생성하고 샘플 데이터 세트를 로드하는 방법을 알아보려면 빠른 시작을 참조하세요.

이 페이지의 예제에서는 다음 Restaurant, AddressGradeEntry 클래스를 모델로 사용합니다:

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

참고

restaurants 컬렉션의 문서는 카멜 케이스 명명 규칙을 사용합니다. 이 가이드의 예제에서는 ConventionPack 사용하여 컬렉션의 필드를 파스칼식 대/소문자로 역직렬화하고 Restaurant 클래스의 속성에 매핑합니다.

사용자 지정 직렬화에 대해 자세히 알아보려면 사용자 지정 직렬화를참조하세요.

변경 스트림을 열려면 Watch() 또는 WatchAsync() 메서드를 호출합니다. 메서드를 호출하는 인스턴스에 따라 변경 스트림이 수신 대기하는 이벤트 범위가 결정됩니다. 다음 클래스에서 Watch() 또는 WatchAsync() 메서드를 호출할 수 있습니다.

  • MongoClient: MongoDB deployment의 모든 변경 사항을 모니터링합니다.

  • Database: 데이터베이스에 있는 모든 컬렉션의 변경 사항을 모니터링합니다.

  • Collection: 컬렉션의 변경 사항을 모니터링합니다.

다음 예시에서는 restaurants 컬렉션에서 변경 스트림을 열고 변경 사항이 발생할 때 출력합니다. Asynchronous 또는 Synchronous 탭을 선택하여 해당 코드를 확인합니다.

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}

변경 사항을 확인하려면 애플리케이션을 실행하세요. 그런 다음 별도의 애플리케이션 또는 셸에서 restaurants 컬렉션을 수정합니다. "name" 값이 "Blarney Castle" 인 문서를 업데이트하면 다음과 같은 변경 스트림 출력이 생성됩니다.

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

pipeline 매개변수를 Watch()WatchAsync() 메서드에 전달하여 변경 스트림 출력을 수정할 수 있습니다. 이 매개변수를 사용하면 지정된 변경 이벤트만 감시할 수 있습니다. EmptyPipelineDefinition 클래스를 사용하고 관련 애그리게이션 단계 메서드를 추가하여 파이프라인을 만듭니다.

pipeline 매개변수에 다음과 같은 애그리게이션 단계를 지정할 수 있습니다.

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

PipelineDefinitionBuilder 클래스를 사용하여 집계 파이프라인을 빌드하는 방법을 알아보려면 빌더를 사용한 작업 가이드 의 집계 파이프라인 빌드 를 참조하세요.

다음 예제에서는 pipeline 매개 변수를 사용하여 업데이트 작업만 기록하는 변경 스트림을 엽니다. Asynchronous 또는 Synchronous 탭을 선택하여 해당 코드를 확인합니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await _restaurantsCollection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = _restaurantsCollection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}

변경 스트림 출력 수정에 대해 자세히 알아보려면 MongoDB Server 매뉴얼의 변경 스트림 출력 수정 섹션을 참조하세요.

Watch()WatchAsync() 메서드는 작업을 구성하는 데 사용할 수 있는 옵션을 나타내는 선택적 매개변수를 허용합니다. 옵션을 지정하지 않으면 드라이버는 작업을 사용자 지정하지 않습니다.

다음 표에서는 Watch()WatchAsync() 의 동작을 사용자 지정하기 위해 설정할 수 있는 옵션에 대해 설명합니다.

옵션
설명
FullDocument
Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.
FullDocumentBeforeChange
Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.
ResumeAfter
Directs Watch() or WatchAsync() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime.
StartAfter
Directs Watch() or WatchAsync() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime.
StartAtOperationTime
Directs Watch() or WatchAsync() to return only events that occur after the specified timestamp.
StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter.
MaxAwaitTime
Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.
ShowExpandedEvents
Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor and set this parameter to True.
BatchSize
Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.
Collation
Specifies the collation to use for the change stream cursor.
Comment
Attaches a comment to the operation.

중요

배포에서 MongoDB v6.0 이상을 사용하는 경우에만 컬렉션에서 사전 이미지 및 사후 이미지를 활성화할 수 있습니다.

기본적으로 컬렉션에서 작업을 수행할 때 해당 변경 이벤트에는 해당 작업에 의해 수정된 필드의 델타만 포함됩니다. 변경 전후의 전체 문서를 보려면 ChangeStreamOptions 객체를 만들고 FullDocumentBeforeChange 또는 FullDocument 옵션을 지정합니다. 그런 다음 ChangeStreamOptions 객체를 Watch() 또는 WatchAsync() 메서드에 전달합니다.

사전 이미지 는 변경 전의 문서 전체 버전입니다. 변경 스트림 이벤트에 사전 이미지를 포함하려면 FullDocumentBeforeChange 옵션을 다음 값 중 하나로 설정합니다.

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable: 변경 이벤트에는 사전 이미지를 사용할 수 있는 경우에만 변경 이벤트에 대해 수정된 문서의 사전 이미지가 포함됩니다.

  • ChangeStreamFullDocumentBeforeChangeOption.Required: 변경 이벤트에는 변경 이벤트에 대한 수정된 문서의 사전 이미지가 포함됩니다. 사전 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.

사후 이미지 는 변경 문서의 전체 버전입니다. 변경 스트림 이벤트에 사후 이미지를 포함하려면 FullDocument 옵션을 다음 값 중 하나로 설정합니다.

  • ChangeStreamFullDocumentOption.UpdateLookup: 변경 이벤트에는 변경 후 일정 시간 이후의 변경된 문서 전체의 복사본이 포함됩니다.

  • ChangeStreamFullDocumentOption.WhenAvailable: 변경 이벤트에는 사후 이미지를 사용할 수 있는 경우에만 변경 이벤트에 대해 수정된 문서의 사후 이미지가 포함됩니다.

  • ChangeStreamFullDocumentOption.Required: 변경 이벤트에는 변경 이벤트에 대한 수정된 문서의 사후 이미지가 포함됩니다. 사후 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.

다음 예시에서는 컬렉션에서 변경 스트림을 열고 FullDocument 옵션을 지정하여 업데이트된 문서의 사후 이미지를 포함합니다. Asynchronous 또는 Synchronous 탭을 선택하여 해당 코드를 확인합니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = _restaurantsCollection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}

앞의 코드 예시를 실행하고 "name" 값이 "Blarney Castle" 인 문서를 업데이트하면 다음과 같은 변경 스트림 출력이 생성됩니다.

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

사전 이미지 및 사후 이미지에 대해 자세히 알아보려면 Change Streams 매뉴얼에서 문서 사전 및 사후 이미지로 MongoDB Server 을 참조하세요.

변경 스트림에 대해 자세히 알아보려면 Change Streams 매뉴얼의 MongoDB Server 을 참조하세요.

이 가이드에서 사용되는 메서드 또는 유형에 대해 자세히 알아보려면 다음 API 설명서를 참조하세요.

돌아가기

문서 수 계산

다음

빌더와의 운영