변화를 주시하세요
변경 스트림을 열어 collection, 데이터베이스 또는 배포서버에 대한 변경 사항과 같은 MongoDB의 데이터 변경 사항을 추적할 수 있습니다. 변경 스트림을 통해 애플리케이션은 데이터의 변경 사항을 감시하고 이에 대응할 수 있습니다.
변경 스트림은 변경 사항 발생 시 변경 이벤트 문서를 반환합니다. 변경 이벤트에는 업데이트 데이터에 대한 정보가 포함되어 있습니다.
다음 코드 예제와 같이 MongoCollection
, MongoDatabase
또는 MongoClient
객체에서 watch()
메서드를 호출하여 변경 스트림을 엽니다.
val changeStream = collection.watch()
watch()
메서드는 선택적으로 여러 단계의 배열로 구성된 집계 파이프라인을 첫 번째 매개변수로 사용하여 다음과 같이 변경 이벤트 출력을 필터링하고 변환합니다:
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
watch()
메서드는 결과에 액세스, 구성, 탐색을 위한 여러 메서드를 제공하는 클래스인 ChangeStreamFlow
의 인스턴스를 반환합니다. ChangeStreamFlow
는 또한 Kotlin 코루틴 라이브러리의 상위 클래스 Flow
의 메서드를 상속합니다.
ChangeStreamFlow
에서 collect()
를 호출하여 이벤트가 발생할 때 처리할 수 있습니다. 또는 Flow
에 내장된 다른 메서드를 사용하여 결과로 작업할 수 있습니다.
변경 스트림에서 반환된 문서를 처리하기 위한 옵션을 구성하려면 watch()
에서 반환한 ChangeStreamFlow
객체의 멤버 메서드를 사용합니다. 사용 가능한 메서드에 대한 자세한 내용은 이 예시 하단에 있는 ChangeStreamFlow
API 문서 링크를 참조하세요.
.collect()를 사용하여 change stream 이벤트 처리
change stream에서 이벤트를 캡처하려면 아래와 같이 collect()
메서드를 호출합니다.
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
.collect()
함수는 변경 이벤트가 발생할 때 Atlas Triggers됩니다. 함수에서 이벤트 문서가 수신되면 처리하도록 로직을 지정할 수 있습니다.
참고
업데이트 작업 변경 이벤트의 경우 변경 스트림은 기본적으로 업데이트된 문서 전체가 아니라 수정된 필드만 반환합니다. 다음과 같이 값이 FullDocument.UPDATE_LOOKUP
인 ChangeStreamFlow
객체의 fullDocument()
멤버 메서드를 호출하여 문서의 최신 버전도 반환하도록 변경 스트림을 구성할 수 있습니다.
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
예시
다음 예제 애플리케이션은 sample_mflix
데이터베이스의 movies
컬렉션에서 변경 스트림을 엽니다. 애플리케이션은 집계 파이프라인을 사용하여 operationType
을 기준으로 변경 사항을 필터링하여 삽입 및 업데이트 이벤트만 수신합니다. 삭제는 생략되어 제외됩니다. 애플리케이션은 .collect()
메서드를 사용하여 컬렉션에서 발생하는 필터링된 변경 이벤트를 수신하고 출력합니다.
애플리케이션이 별도의 코루틴 작업에서 collect()
작업을 실행하여 change stream이 열려 있는 동안 애플리케이션을 계속 실행할 수 있습니다. 작업이 완료되면 애플리케이션은 change stream을 닫고 종료됩니다.
참고
이 예시 에서는 연결 URI를 사용하여 MongoDB 인스턴스 에 연결합니다. MongoDB 인스턴스 에 연결하는 방법에 학습 보려면 연결 가이드 를 참조하세요.
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
이 페이지에 언급된 클래스 및 메서드에 대한 추가 정보는 다음 리소스를 참조하세요.
Change Streams 서버 수동 입력
변경 이벤트 서버 수동 입력
집계 파이프라인 서버 수동 입력
집계 단계 서버 수동 입력
ChangeStreamFlow API 문서
MongoCollection.watch() API 설명서
MongoDatabase.watch() API 설명서
MongoClient.watch() API 문서