Docs Menu
Docs Home
/ / /
Kotlin 코루틴

Change Streams 열기

이 페이지의 내용

  • 개요
  • 변경 스트림 열기
  • change stream에 애그리게이션 연산자 적용
  • 대규모 change stream 이벤트 분할
  • 사전 이미지 및 사후 이미지 포함하기

이 가이드에서는 change stream을 사용하여 데이터베이스의 실시간 변경 사항을 모니터링하는 방법을 배울 수 있습니다. change stream은 애플리케이션이 단일 collection, 데이터베이스 또는 배포에서 데이터 변경 사항을 구독할 수 있도록 하는 MongoDB Server 기능입니다. 애그리게이션 연산자 세트를 지정하여 애플리케이션이 수신하는 데이터를 필터링하고 변환할 수 있습니다. MongoDB v6.0 이상에 연결할 때 변경 전후의 문서 데이터를 포함하도록 이벤트를 구성할 수 있습니다.

다음 섹션에서 change stream을 열고 구성하는 방법을 알아보세요.

  • 변경 스트림 열기

  • change stream에 애그리게이션 연산자 적용

  • 대규모 change stream 이벤트 분할

  • 사전 이미지 및 사후 이미지 포함하기

change stream을 열어 특정 유형의 데이터 변경 사항을 구독하고 애플리케이션에서 변경 이벤트를 생성할 수 있습니다.

change stream을 열려면 watch() MongoCollectionMongoDatabaseinstance, 또는 인스턴스에서 메서드를 MongoClient 호출합니다.

중요

독립형 MongoDB 배포는 변경 스트림을 지원하지 않는데, 이는 이 기능에 복제본 세트 oplog가 필요하기 때문입니다. oplog 에 대해 자세히 알아보려면 복제본 세트 oplog 서버 매뉴얼 페이지를 참조하세요.

watch() 메서드를 호출하는 객체에 따라 change stream이 수신하는 이벤트의 범위가 결정됩니다.

MongoCollection 에서 watch() 를 호출하면 change stream이 collection을 모니터링합니다.

MongoDatabase 에서 watch() 를 호출하면 change stream이 해당 데이터베이스의 모든 collection을 모니터링합니다.

MongoClient에서 watch() 를 호출하면 change stream은 연결된 MongoDB의 모든 변경 사항을 모니터링합니다.

다음 코드 예제에서는 collection의 데이터가 변경될 때마다 change stream을 열고 change stream 이벤트를 인쇄하는 방법을 보여 줍니다.

// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

collection에 대한 삽입 작업은 다음 텍스트와 유사한 출력을 생성해야 합니다.

Received a change event: ChangeStreamDocument{
operationType='insert',
resumeToken={"_data": "825EC..."},
namespace=myDb.myChangeStreamCollection,
...
}

실행 가능한 예시 는 Watch for Changes 사용 예시 페이지를 참조하세요.

watch() 메서드에 대해 자세히 알아보려면 다음 API 문서를 참조하세요.

집계 파이프라인을 watch() 메서드에 매개변수로 전달하여 change stream이 수신할 이벤트를 지정할 수 있습니다.

사용 중인 MongoDB 서버 버전이 지원하는 애그리게이션 연산자를 알아보려면 변경 스트림 출력 수정을 참조하세요.

다음 코드 예제에서는 집계 파이프라인을 적용하여 삽입 및 업데이트 작업에 대해서만 이벤트를 수신하도록 change stream을 구성하는 방법을 보여 줍니다.

val pipeline = listOf(
Aggregates.match(Filters.`in`("operationType",
listOf("insert", "update")))
)
// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

변경 스트림이 변경 업데이트 이벤트를 수신하면 앞의 코드 예시는 다음 텍스트를 출력합니다.

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
...

MongoDB v7.0 이상에 연결할 때 $changeStreamSplitLargeEvent 애그리게이션 연산자를 사용하여 16MB를 초과하는 이벤트 문서를 더 작은 조각으로 분할할 수 있습니다.

change stream 이벤트가 문서 크기 제한을 초과할 것으로 예상되는 경우에만 $changeStreamSplitLargeEvent 연산자를 사용합니다. 예를 들어, 애플리케이션에 전체 문서 사전 이미지 또는 사후 이미지가 필요한 경우 이 기능을 사용할 수 있습니다.

$changeStreamSplitLargeEvent 애그리게이션 단계에서는 프래그먼트를 순차적으로 반환합니다. 변경 스트림 커서를 사용하여 프래그먼트에 액세스할 수 있습니다. 각 프래그먼트 문서에는 다음 필드를 포함하는 splitEvent 객체가 포함되어 있습니다.

필드
설명
fragment
에서 시작하는 프래그먼트의 인덱스 1
of
분할 이벤트를 구성하는 총 프래그먼트 수

다음 예에서는 대규모 이벤트를 split하기 위해 $changeStreamSplitLargeEvent 집계 단계가 있는 집계 파이프라인이 포함된 change stream을 엽니다.

val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

참고

집계 파이프라인에는 $changeStreamSplitLargeEvent 단계가 하나만 있을 수 있으며 이 단계가 파이프라인의 마지막 단계여야 합니다.

$changeStreamSplitLargeEvent 애그리게이션 연산자에 대해 자세히 알아보려면 MongoDB Server 매뉴얼에서 $changeStreamSplitLargeEvent(애그리게이션) 를 참조하세요.

다음 데이터를 포함하거나 생략하도록 변경 이벤트를 구성할 수 있습니다.

  • 작업 전의 문서의 버전을 나타내는 문서인 사전 이미지 가 존재하는 경우

  • 작업 후 문서의 버전을 나타내는 문서인 사후 이미지(있는 경우).

사전 이미지 또는 사후 이미지가 포함된 변경 스트림 이벤트를 수신하려면 MongoDB v6.0 이상 배포에 연결하고 다음을 설정해야 합니다.

  • MongoDB deployment에서 collection에 대한 사전 이미지 및 사후 이미지를 활성화합니다.

    배포에서 이러한 기능을 활성화하는 방법을 알아보려면 Change Streams MongoDB 서버 사전 및 사후 이미지 문서를 사용한 매뉴얼 페이지를 참조하세요.

    사전 이미지 및 사후 이미지가 활성화된 컬렉션을 생성하도록 드라이버에 지시하는 방법을 알아보려면 사전 이미지 및 사후 이미지가 활성화 된 컬렉션 생성 섹션을 참조하세요.

  • 사전 이미지와 사후 이미지 중 하나 또는 둘 다를 검색하도록 change stream을 구성합니다.

    사전 이미지를 포함하도록 변경 스트림을 구성하려면 사전 이미지 구성 예시를 참조하세요.

    사후 이미지를 포함하도록 변경 스트림을 구성하려면 사후 이미지 구성 예시를 참조하세요.

드라이버를 사용하여 사전 이미지 및 사후 이미지 옵션을 사용하여 collection을 ChangeStreamPreAndPostImagesOptions 만들려면 createCollection() 다음 예와 같이 인스턴스를 지정하고 메서드를 호출합니다.

val collectionOptions = CreateCollectionOptions()
collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true))
database.createCollection("myChangeStreamCollection", collectionOptions)

MongoDB Shell에서 collMod 명령을 실행하여 기존 컬렉션의 사전 이미지 및 사후 이미지 옵션을 변경할 수 있습니다. 이 작업을 수행하는 방법을 알아보려면 collMod 서버 매뉴얼 설명서를 참조하세요.

경고

collection에서 이 옵션을 수정할 때 사전 이미지 또는 사후 이미지를 수신해야 하도록 구성된 경우 애플리케이션에서 해당 collection에서 열려 있는 change stream이 실패할 수 있습니다.

다음 코드 예제에서는 사전 이미지를 포함하고 결과를 출력하도록 변경 스트림을 구성하는 방법을 보여 줍니다.

val job = launch {
val changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

앞의 예에서는 FullDocumentBeforeChange.REQUIRED 옵션을 사용하도록 change stream을 구성합니다. 이렇게 하면 교체, 업데이트 및 삭제 이벤트에 대한 사전 이미지를 반환하고 사전 이미지를 사용할 수 없는 경우 서버에서 오류를 발생시키도록 change stream을 구성합니다.

애플리케이션이 latestVersion 소프트웨어 라이브러리 종속성 collection에 있는 문서의 필드를 값에서 2.0.0 로 업데이트했다고 2.1.0 가정해 보겠습니다. 앞의 코드 예제의 해당 변경 이벤트 출력은 다음 텍스트와 유사해야 합니다.

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...}
namespace=software.libraries,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}},
...

옵션 목록은 FullDocumentBeforeChange 를 참조하세요. API 문서.

다음 코드 예제에서는 사후 이미지를 포함하고 결과를 출력하도록 변경 스트림을 구성하는 방법을 보여 줍니다.

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

앞의 예에서는 FullDocument.UPDATE_LOOKUP 옵션을 사용하도록 change stream을 구성합니다. 이렇게 하면 원본 및 변경된 문서 간의 델타와 변경이 발생한 후 특정 시점의 문서 사본 간의 델타를 모두 반환하도록 change stream을 구성합니다.

애플리케이션이 document의 필드를 도시 인구 조사 데이터 collection에서 의 population 값에서 로 업데이트했다고 가정해 보겠습니다.800 950 앞의 코드 예제의 해당 변경 이벤트 출력은 다음 텍스트와 유사해야 합니다.

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
namespace=censusData.cities,
destinationNamespace=null,
fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}},
updatedFields={"population": 950}, ...
...

옵션 목록은 FullDocument 를 참조하세요. API 문서.

다음

MongoDB 코틀린(Kotlin) 드라이버