Docs Menu
Docs Home
/ / /
Scala
/

데이터 변경 사항 모니터링

이 페이지의 내용

  • 개요
  • 샘플 데이터
  • 변경 스트림 열기
  • 변경 스트림 출력 수정
  • watch() 동작 수정
  • 사전 이미지 및 사후 이미지 포함하기
  • 추가 정보
  • API 문서

이 가이드 에서는 변경 스트림 을 사용하여 데이터의 실시간 변경 사항을 모니터 하는 방법을 학습 수 있습니다. 변경 스트림 은 애플리케이션 이 컬렉션, 데이터베이스 또는 배포서버 서버의 데이터 변경 사항을 구독 할 수 있도록 하는 MongoDB Server 기능 입니다.

스칼라 운전자 사용하는 경우 watch() 메서드를 호출하여 ChangeStreamObservable의 인스턴스 반환할 수 있습니다. 그런 다음 ChangeStreamObservable 인스턴스 를 구독 업데이트, 삽입 및 삭제와 같은 새로운 데이터 변경 사항을 볼 수 있습니다.

이 가이드 의 예제에서는 restaurants sample_restaurants Atlas 샘플 데이터 세트의 데이터베이스 에 있는 컬렉션 사용합니다. 스칼라 애플리케이션 에서 이 컬렉션 액세스 하려면 MongoClient Atlas cluster 에 연결하는 를 만들고 및 변수에 다음 값을 할당합니다.database collection

val database: MongoDatabase = client.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

무료 MongoDB Atlas cluster 를 생성하고 샘플 데이터 세트를 로드하는 방법을 학습 보려면 Atlas 시작하기 가이드 를 참조하세요.

일부 예에서는 LatchedObserver 클래스의 인스턴스를 사용하여 변경 스트림 이벤트를 처리하다 . 이 클래스는 변경 스트림 이벤트를 출력하고 스트림 완료되거나 오류를 생성할 때까지 데이터 변경 사항을 계속 모니터링 사용자 지정 관찰자입니다. LatchedObserver 클래스를 사용하려면 다음 코드를 애플리케이션 파일 에 붙여넣습니다.

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}

변경 스트림 을 열려면 watch() 메서드를 호출합니다. watch() 메서드를 호출하는 인스턴스 에 따라 변경 스트림 이 모니터링하는 이벤트 범위가 결정됩니다. 다음 클래스의 인스턴스에서 watch() 메서드를 호출할 수 있습니다.

  • MongoClient: 시스템 컬렉션 adminlocal또는, 및 데이터베이스의 컬렉션을 제외한 배포서버 내 모든 데이터베이스의 모든 컬렉션에 대한 변경 사항을 모니터링합니다.config

  • MongoDatabase: 하나의 데이터베이스 에 있는 모든 컬렉션의 변경 사항을 모니터링합니다.

  • MongoCollection: 하나의 컬렉션 에 대한 변경 사항을 모니터링합니다.

다음 예시 watch() 메서드를 호출하여 restaurants 컬렉션 에서 변경 스트림 엽니다. 이 코드는 변경 사항이 발생하면 이를 수신하고 출력하는 LatchedObserver 인스턴스 만듭니다.

val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await()

변경 사항을 확인하려면 앞의 코드를 실행 . 그런 다음 별도의 셸 에서 다음 코드를 실행 name 필드 값이 "Blarney Castle"인 문서 업데이트 .

val filter = equal("name", "Blarney Castle")
val update = set("cuisine", "American")
collection.updateOne(filter, update)
.subscribe((res: UpdateResult) => println(res),
(e: Throwable) => println(s"There was an error: $e"))

컬렉션 업데이트 위해 앞의 코드를 실행 변경 스트림 애플리케이션 변경 사항이 발생하는 즉시 출력합니다. 인쇄된 변경 이벤트 다음 출력과 유사합니다.

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null,
fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}},
clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[],
updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null},
txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}

변경 스트림 출력을 수정하려면 파이프라인 단계 목록을 watch() 메서드에 매개 변수로 전달하면 됩니다. 목록에 다음 단계를 포함할 수 있습니다.

  • $addFields 또는 $set: 문서에 새 필드를 추가합니다.

  • $match: 문서를 필터링합니다.

  • $project: 문서 필드의 하위 집합을 프로젝션합니다.

  • $replaceWith 또는 $replaceRoot: 입력 문서 를 지정된 문서 로 바꿉니다.

  • $redact: 문서의 내용을 제한합니다.

  • $unset: 문서에서 필드를 제거합니다.

스칼라 운전자 이전 파이프라인 단계를 빌드하기 위한 헬퍼 메서드가 포함된 Aggregates 클래스를 제공합니다.

파이프라인 단계와 해당 Aggregates 도우미 메서드에 대해 자세히 학습 다음 리소스를 참조하세요.

다음 예시 Aggregates.filter() 메서드를 사용하여 $match 단계를 빌드 파이프라인 만듭니다. 그런 다음 코드는 이 파이프라인 watch() 메서드에 전달하고 업데이트 작업이 발생할 때만 이벤트를 출력하도록 watch() 에 지시합니다.

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update"))))
observer.await()

ChangeStreamObservable 클래스에서 제공하는 메서드를 연결하여 watch() 메서드의 동작을 수정할 수 있습니다. 다음 표에서는 이러한 메서드 중 일부에 대해 설명합니다.

메서드
설명

fullDocument()

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see the Include Pre-Images and Post-Images section of this guide.

fullDocumentBeforeChange()

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

comment()

Attaches a comment to the operation.

startAtOperationTime()

Instructs the change stream to provide only changes that occurred at or after the specified timestamp.

collation()

Sets the collation to use for the change stream cursor.

watch() 옵션의 전체 목록은 API 문서에서 ChangeStreamObservable을 참조하세요.

중요

배포서버 MongoDB Server v6.0 이상을 사용하는 경우에만 컬렉션에서 사전 이미지 및 사후 이미지를 활성화 할 수 있습니다.

기본값 으로 컬렉션 에서 작업을 수행할 때 해당 변경 이벤트 작업 전후에 수정된 필드와 해당 값만 포함됩니다.

watch() 메서드에 수정된 필드 외에도 문서의 사전 이미지, 변경 전 문서 의 전체 버전을 반환하도록 지시할 수 있습니다. 변경 스트림 이벤트 에 사전 이미지를 포함하려면 fullDocumentBeforeChange() 메서드를 watch() 에 연결합니다. 다음 값 중 하나를 fullDocumentBeforeChange() 메서드에 전달합니다.

  • FullDocumentBeforeChange.WHEN_AVAILABLE: 변경 이벤트 에는 변경 이벤트에 대한 수정된 문서 의 사전 이미지가 포함됩니다. 사전 이미지를 사용할 수 없는 경우 이 변경 이벤트 필드 의 값은 null 입니다.

  • FullDocumentBeforeChange.REQUIRED: 변경 이벤트 에는 변경 이벤트에 대한 수정된 문서 의 사전 이미지가 포함됩니다. 사전 이미지를 사용할 수 없는 경우 서버 에서 오류가 발생합니다.

메서드에 수정된 필드 외에도 문서의 사후 watch() 이미지 변경 후 문서 의 전체 버전을 반환하도록 지시할 수도 있습니다. 변경 스트림 이벤트 에 사후 이미지를 포함하려면 fullDocument() 메서드를 watch() 에 연결합니다. 다음 값 중 하나를 fullDocument() 메서드에 전달합니다.

  • FullDocument.UPDATE_LOOKUP: 변경 이벤트에는 변경 후 일정 시간 이후의 변경된 문서 전체의 복사본이 포함됩니다.

  • FullDocument.WHEN_AVAILABLE: 변경 이벤트 에는 변경 이벤트에 대한 수정된 문서 의 사후 이미지가 포함됩니다. 사후 이미지를 사용할 수 없는 경우 이 변경 이벤트 필드 의 값은 null 입니다.

  • FullDocument.REQUIRED: 변경 이벤트 에는 변경 이벤트에 대한 수정된 문서 의 사후 이미지가 포함됩니다. 사후 이미지를 사용할 수 없는 경우 서버 에서 오류가 발생합니다.

다음 예시 컬렉션 에서 watch() 메서드를 호출하고 fullDocument() 메서드를 연결하여 업데이트된 문서의 사후 이미지를 포함합니다.

val observer = LatchedObserver()
collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.subscribe(observer)
observer.await()

변경 스트림 애플리케이션 별도의 셸 에서 실행 restaurants 경우 앞의 업데이트 예시 사용하여 컬렉션 의 문서 업데이트하면 다음 출력과 유사한 변경 이벤트 출력됩니다.

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null,
fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24",
"coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard",
"zipcode": "11697"}), (borough,BsonString{value='Queens'}),
(cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}),
(name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}),
(blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null,
documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=
UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{...}}

사전 이미지 및 사후 이미지에 대해 자세히 알아보려면 Change Streams 매뉴얼에서 문서 사전 및 사후 이미지로 MongoDB Server 을 참조하세요.

변경 스트림에 대해 자세히 알아보려면 Change Streams 매뉴얼의 MongoDB Server 을 참조하세요.

이 가이드에서 사용되는 메서드 또는 유형에 대해 자세히 알아보려면 다음 API 설명서를 참조하세요.

돌아가기

문서 수 계산