변경 스트림
MongoDB Server 버전 3.6 에는 $changeStream
집계 파이프라인 연산자가 도입되었습니다.
변경 스트림을 사용하면 컬렉션 의 문서에 대한 변경 사항을 관찰할 수 있습니다. 이 새 단계의 유용성을 개선하기 위해 MongoCollection
유형에는 watch()
메서드가 포함되어 있습니다. ChangeStreamObservable
인스턴스 는 변경 스트림 을 설정하고 잠재적으로 복구 가능한 오류가 발생하면 자동으로 재개를 시도합니다.
전제 조건
이 가이드의 코드 예제를 실행하려면 다음 구성 요소를 설정해야 합니다.
A
test.restaurants
컬렉션 populated with documents from therestaurants.json
파일 in the documentation assets Github.다음 가져오기 문:
import java.util.concurrent.CountDownLatch import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.changestream._
참고
이 가이드 에서는 퀵 스타트 프라이머에 설명된 대로 Observable
암시를 사용합니다.
MongoDB 배포에 연결하기
먼저 MongoDB deployment에 연결한 다음 MongoDatabase
및 MongoCollection
인스턴스를 선언하고 정의합니다.
다음 코드는 포트 27017
의 localhost
에서 실행되는 독립형 MongoDB 배포서버에 연결합니다. 그런 다음 test
데이터베이스를 참조하는 database
변수와 restaurants
컬렉션을 참조하는 collection
변수를 정의합니다.
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
MongoDB deployment에 연결하는 방법에 대해 자세히 알아보려면 MongoDB에 연결 튜토리얼을 참조하세요.
컬렉션의 모든 변경 사항 보기
변경 스트림을 만들려면 MongoCollection.watch()
메서드 중 하나를 사용합니다.
다음 예시 에서 변경 스트림 은 관찰한 모든 변경 사항을 출력합니다.
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() } val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await() // Block waiting for the latch
데이터베이스의 변경 사항 확인
애플리케이션은 단일 변경 스트림 을 열어 데이터베이스 의 모든 비시스템 컬렉션을 감시할 수 있습니다. 이러한 변경 스트림 을 만들려면 MongoDatabase.watch()
메서드 중 하나를 사용합니다.
다음 예제에서 변경 스트림은 지정된 데이터베이스에서 관찰되는 모든 변경 사항을 출력합니다.
val observer = LatchedObserver() database.watch().subscribe(observer) observer.await() // Block waiting for the latch
모든 데이터베이스의 변경 사항 확인
애플리케이션은 단일 변경 스트림을 열어 MongoDB 배포에 있는 모든 데이터베이스의 모든 비시스템 컬렉션을 감시할 수 있습니다. 이러한 변경 스트림을 만들려면 MongoClient.watch()
메서드 중 하나를 사용합니다.
다음 예시 에서 변경 스트림 은 MongoClient
가 연결된 배포서버 서버에서 관찰되는 모든 변경 사항을 출력합니다.
val observer = LatchedObserver() client.watch().subscribe(observer) observer.await() // Block waiting for the latch
콘텐츠 필터링
집계 단계 목록을 watch()
메서드에 전달하여 $changeStream
연산자 에서 반환한 데이터를 수정할 수 있습니다.
참고
모든 애그리게이션 연산자가 지원되는 것은 아닙니다. 자세한 내용은 MongoDB Server 매뉴얼의 Change Streams 를 참조하세요.
다음 예시 에서 변경 스트림 은 insert
, update
, replace
및 delete
연산에 해당하는 모든 변경 사항을 출력합니다.
먼저 파이프라인 에는 operationType
이 insert
, update
, replace
또는 delete
중 하나인 문서를 필터하다 하는 $match
단계가 포함되어 있습니다. 그런 다음 fullDocument
을 FullDocument.UPDATE_LOOKUP
로 설정하여 업데이트 후의 문서 가 결과에 포함되도록 합니다.
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete"))))) .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer) observer.await() // Block waiting for the latch