Change Streams 열기
개요
이 가이드에서는 change stream을 사용하여 데이터베이스의 실시간 변경 사항을 모니터링하는 방법을 배울 수 있습니다. change stream은 애플리케이션이 단일 collection, 데이터베이스 또는 배포에서 데이터 변경 사항을 구독할 수 있도록 하는 MongoDB Server 기능입니다. 애그리게이션 연산자 세트를 지정하여 애플리케이션이 수신하는 데이터를 필터링하고 변환할 수 있습니다. MongoDB v6.0 이상에 연결할 때 변경 전후의 문서 데이터를 포함하도록 이벤트를 구성할 수 있습니다.
다음 섹션에서 change stream을 열고 구성하는 방법을 알아보세요.
변경 스트림 열기
change stream을 열어 특정 유형의 데이터 변경 사항을 구독하고 애플리케이션에서 변경 이벤트를 생성할 수 있습니다.
change stream을 열려면 watch()
MongoCollection
MongoDatabase
instance, 또는 인스턴스에서 메서드를 MongoClient
호출합니다.
중요
독립형 MongoDB 배포는 변경 스트림을 지원하지 않는데, 이는 이 기능에 복제본 세트 oplog가 필요하기 때문입니다. oplog 에 대해 자세히 알아보려면 복제본 세트 oplog 서버 매뉴얼 페이지를 참조하세요.
watch()
메서드를 호출하는 객체에 따라 change stream이 수신하는 이벤트의 범위가 결정됩니다.
MongoCollection
에서 watch()
를 호출하면 change stream이 collection을 모니터링합니다.
MongoDatabase
에서 watch()
를 호출하면 change stream이 해당 데이터베이스의 모든 collection을 모니터링합니다.
MongoClient
에서 watch()
를 호출하면 change stream은 연결된 MongoDB의 모든 변경 사항을 모니터링합니다.
예시
다음 코드 예제에서는 collection의 데이터가 변경될 때마다 change stream을 열고 change stream 이벤트를 인쇄하는 방법을 보여 줍니다.
// Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
collection에 대한 삽입 작업은 다음 텍스트와 유사한 출력을 생성해야 합니다.
Received a change event: ChangeStreamDocument{ operationType='insert', resumeToken={"_data": "825EC..."}, namespace=myDb.myChangeStreamCollection, ... }
실행 가능한 예시 는 Watch for Changes 사용 예시 페이지를 참조하세요.
watch()
메서드에 대해 자세히 알아보려면 다음 API 문서를 참조하세요.
change stream에 애그리게이션 연산자 적용
집계 파이프라인을 watch()
메서드에 매개변수로 전달하여 change stream이 수신할 이벤트를 지정할 수 있습니다.
사용 중인 MongoDB 서버 버전이 지원하는 애그리게이션 연산자를 알아보려면 변경 스트림 출력 수정을 참조하세요.
예시
다음 코드 예제에서는 집계 파이프라인을 적용하여 삽입 및 업데이트 작업에 대해서만 이벤트를 수신하도록 change stream을 구성하는 방법을 보여 줍니다.
val pipeline = listOf( Aggregates.match(Filters.`in`("operationType", listOf("insert", "update"))) ) // Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
변경 스트림이 변경 업데이트 이벤트를 수신하면 앞의 코드 예시는 다음 텍스트를 출력합니다.
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, ...
대규모 change stream 이벤트 분할
MongoDB v7.0 이상에 연결할 때 $changeStreamSplitLargeEvent
애그리게이션 연산자를 사용하여 16MB를 초과하는 이벤트 문서를 더 작은 조각으로 분할할 수 있습니다.
change stream 이벤트가 문서 크기 제한을 초과할 것으로 예상되는 경우에만 $changeStreamSplitLargeEvent
연산자를 사용합니다. 예를 들어, 애플리케이션에 전체 문서 사전 이미지 또는 사후 이미지가 필요한 경우 이 기능을 사용할 수 있습니다.
$changeStreamSplitLargeEvent
애그리게이션 단계에서는 프래그먼트를 순차적으로 반환합니다. 변경 스트림 커서를 사용하여 프래그먼트에 액세스할 수 있습니다. 각 프래그먼트 문서에는 다음 필드를 포함하는 splitEvent
객체가 포함되어 있습니다.
필드 | 설명 |
---|---|
| 에서 시작하는 프래그먼트의 인덱스 |
| 분할 이벤트를 구성하는 총 프래그먼트 수 |
다음 예에서는 대규모 이벤트를 split하기 위해 $changeStreamSplitLargeEvent
집계 단계가 있는 집계 파이프라인이 포함된 change stream을 엽니다.
val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument())) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
참고
집계 파이프라인에는 $changeStreamSplitLargeEvent
단계가 하나만 있을 수 있으며 이 단계가 파이프라인의 마지막 단계여야 합니다.
$changeStreamSplitLargeEvent
애그리게이션 연산자에 대해 자세히 알아보려면 MongoDB Server 매뉴얼에서 $changeStreamSplitLargeEvent(애그리게이션) 를 참조하세요.
사전 이미지 및 사후 이미지 포함하기
다음 데이터를 포함하거나 생략하도록 변경 이벤트를 구성할 수 있습니다.
작업 전의 문서의 버전을 나타내는 문서인 사전 이미지 가 존재하는 경우
작업 후 문서의 버전을 나타내는 문서인 사후 이미지(있는 경우).
사전 이미지 또는 사후 이미지가 포함된 변경 스트림 이벤트를 수신하려면 MongoDB v6.0 이상 배포에 연결하고 다음을 설정해야 합니다.
MongoDB deployment에서 collection에 대한 사전 이미지 및 사후 이미지를 활성화합니다.
팁
배포에서 이러한 기능을 활성화하는 방법을 알아보려면 Change Streams MongoDB 서버 사전 및 사후 이미지 문서를 사용한 매뉴얼 페이지를 참조하세요.
사전 이미지 및 사후 이미지가 활성화된 컬렉션을 생성하도록 드라이버에 지시하는 방법을 알아보려면 사전 이미지 및 사후 이미지가 활성화 된 컬렉션 생성 섹션을 참조하세요.
사전 이미지와 사후 이미지 중 하나 또는 둘 다를 검색하도록 change stream을 구성합니다.
팁
사전 이미지를 포함하도록 변경 스트림을 구성하려면 사전 이미지 구성 예시를 참조하세요.
사후 이미지를 포함하도록 변경 스트림을 구성하려면 사후 이미지 구성 예시를 참조하세요.
사전 이미지 및 사후 이미지를 활성화하여 collection 만들기
드라이버를 사용하여 사전 이미지 및 사후 이미지 옵션을 사용하여 collection을 ChangeStreamPreAndPostImagesOptions
만들려면 createCollection()
다음 예와 같이 인스턴스를 지정하고 메서드를 호출합니다.
val collectionOptions = CreateCollectionOptions() collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true)) database.createCollection("myChangeStreamCollection", collectionOptions)
MongoDB Shell에서 collMod
명령을 실행하여 기존 컬렉션의 사전 이미지 및 사후 이미지 옵션을 변경할 수 있습니다. 이 작업을 수행하는 방법을 알아보려면 collMod 서버 매뉴얼 설명서를 참조하세요.
경고
collection에서 이 옵션을 수정할 때 사전 이미지 또는 사후 이미지를 수신해야 하도록 구성된 경우 애플리케이션에서 해당 collection에서 열려 있는 change stream이 실패할 수 있습니다.
사전 이미지 구성 예시
다음 코드 예제에서는 사전 이미지를 포함하고 결과를 출력하도록 변경 스트림을 구성하는 방법을 보여 줍니다.
val job = launch { val changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
앞의 예에서는 FullDocumentBeforeChange.REQUIRED
옵션을 사용하도록 change stream을 구성합니다. 이렇게 하면 교체, 업데이트 및 삭제 이벤트에 대한 사전 이미지를 반환하고 사전 이미지를 사용할 수 없는 경우 서버에서 오류를 발생시키도록 change stream을 구성합니다.
애플리케이션이 latestVersion
소프트웨어 라이브러리 종속성 collection에 있는 문서의 필드를 값에서 2.0.0
로 업데이트했다고 2.1.0
가정해 보겠습니다. 앞의 코드 예제의 해당 변경 이벤트 출력은 다음 텍스트와 유사해야 합니다.
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...} namespace=software.libraries, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}}, ...
옵션 목록은 FullDocumentBeforeChange 를 참조하세요. API 문서.
사후 이미지 구성 예시
다음 코드 예제에서는 사후 이미지를 포함하고 결과를 출력하도록 변경 스트림을 구성하는 방법을 보여 줍니다.
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
앞의 예에서는 FullDocument.UPDATE_LOOKUP
옵션을 사용하도록 change stream을 구성합니다. 이렇게 하면 원본 및 변경된 문서 간의 델타와 변경이 발생한 후 특정 시점의 문서 사본 간의 델타를 모두 반환하도록 change stream을 구성합니다.
애플리케이션이 document의 필드를 도시 인구 조사 데이터 collection에서 의 population
값에서 로 업데이트했다고 가정해 보겠습니다.800
950
앞의 코드 예제의 해당 변경 이벤트 출력은 다음 텍스트와 유사해야 합니다.
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, namespace=censusData.cities, destinationNamespace=null, fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}}, updatedFields={"population": 950}, ... ...
옵션 목록은 FullDocument 를 참조하세요. API 문서.