Docs Menu
Docs Home
/ / /
Scala
/

변경 스트림

이 페이지의 내용

  • 전제 조건
  • MongoDB 배포에 연결하기
  • 컬렉션의 모든 변경 사항 보기
  • 데이터베이스의 변경 사항 확인
  • 모든 데이터베이스의 변경 사항 확인
  • 콘텐츠 필터링

MongoDB Server 버전 3.6 에는 $changeStream 집계 파이프라인 연산자가 도입되었습니다.

변경 스트림을 사용하면 컬렉션 의 문서에 대한 변경 사항을 관찰할 수 있습니다. 이 새 단계의 유용성을 개선하기 위해 MongoCollection 유형에는 watch() 메서드가 포함되어 있습니다. ChangeStreamObservable 인스턴스 는 변경 스트림 을 설정하고 잠재적으로 복구 가능한 오류가 발생하면 자동으로 재개를 시도합니다.

이 가이드의 코드 예제를 실행하려면 다음 구성 요소를 설정해야 합니다.

  • 문서 자산 에 test.restaurants 있는 파일 restaurants.json 의 문서로 Github 채워진 컬렉션 입니다.

  • 다음 가져오기 문:

import java.util.concurrent.CountDownLatch
import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.changestream._

참고

이 가이드 에서는 퀵 스타트 프라이머에 설명된 대로 Observable 암시를 사용합니다.

먼저 MongoDB deployment에 연결한 다음 MongoDatabaseMongoCollection 인스턴스를 선언하고 정의합니다.

다음 코드는 포트 27017localhost 에서 실행되는 독립형 MongoDB 배포서버에 연결합니다. 그런 다음 test 데이터베이스를 참조하는 database 변수와 restaurants 컬렉션을 참조하는 collection 변수를 정의합니다.

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

MongoDB deployment에 연결하는 방법에 대해 자세히 알아보려면 MongoDB에 연결 튜토리얼을 참조하세요.

변경 스트림을 만들려면 MongoCollection.watch() 메서드 중 하나를 사용합니다.

다음 예시 에서 변경 스트림 은 관찰한 모든 변경 사항을 출력합니다.

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}
val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await() // Block waiting for the latch

애플리케이션은 단일 변경 스트림 을 열어 데이터베이스 의 모든 비시스템 컬렉션을 감시할 수 있습니다. 이러한 변경 스트림 을 만들려면 MongoDatabase.watch() 메서드 중 하나를 사용합니다.

다음 예제에서 변경 스트림은 지정된 데이터베이스에서 관찰되는 모든 변경 사항을 출력합니다.

val observer = LatchedObserver()
database.watch().subscribe(observer)
observer.await() // Block waiting for the latch

애플리케이션은 단일 변경 스트림을 열어 MongoDB 배포에 있는 모든 데이터베이스의 모든 비시스템 컬렉션을 감시할 수 있습니다. 이러한 변경 스트림을 만들려면 MongoClient.watch() 메서드 중 하나를 사용합니다.

다음 예시 에서 변경 스트림 은 MongoClient 가 연결된 배포서버 서버에서 관찰되는 모든 변경 사항을 출력합니다.

val observer = LatchedObserver()
client.watch().subscribe(observer)
observer.await() // Block waiting for the latch

집계 단계 목록을 watch() 메서드에 전달하여 $changeStream 연산자 에서 반환한 데이터를 수정할 수 있습니다.

참고

모든 애그리게이션 연산자가 지원되는 것은 아닙니다. 자세한 내용은 MongoDB Server 매뉴얼의 Change Streams 를 참조하세요.

다음 예시 에서 변경 스트림 은 insert, update, replacedelete 연산에 해당하는 모든 변경 사항을 출력합니다.

먼저 파이프라인 에는 operationTypeinsert, update, replace 또는 delete 중 하나인 문서를 필터하다 하는 $match 단계가 포함되어 있습니다. 그런 다음 fullDocumentFullDocument.UPDATE_LOOKUP 로 설정하여 업데이트 후의 문서 가 결과에 포함되도록 합니다.

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
observer.await() // Block waiting for the latch

돌아가기

애그리게이션 프레임워크