Docs Menu
Docs Home
/ / /
Scala
/

Observables

이 페이지의 내용

  • 관찰 가능
  • SingleObservable
  • 서브스크립션
  • 관찰자
  • 배압
  • 관찰 가능한 헬퍼
  • SingleObservable

Scala 드라이버는 비동기 및 비차단 드라이버입니다. Observable 모델을 구현하면 비동기 이벤트가 중첩된 콜백의 복잡성에서 벗어나 간단하고 구성 가능한 작업이 됩니다.

비동기 작업의 경우 세 가지 인터페이스가 있습니다.

  • 관찰 가능

  • 서브스크립션

  • 관찰자

참고

이 드라이버는 MongoDB Reactive Streams 드라이버 를 기반으로 구축되었으며, Reactive Streams 사양을 구현한 것입니다. ObservablePublisher 의 구현이고 ObserverSubscriber 의 구현입니다.

다음과 같은 클래스 명명 규칙이 적용됩니다.

  • Observable: 사용자 지정 구현 Publisher

  • Observer: 사용자 지정 구현 Subscriber

  • Subscription

Observable 은(는) 확장된 Publisher 구현이며, 일반적으로 Subscription 에서 Observable 로의 요청에 따라 결과를 Observer 로 내보내는 MongoDB 작업을 나타냅니다.

중요

Observable 부분 함수로 생각할 수 있습니다. 부분 함수와 마찬가지로 호출될 때까지 아무 일도 발생하지 않습니다. Observable 는 여러 번 구독할 수 있으며, 각 구독은 MongoDB를 쿼리하거나 데이터를 삽입하는 등의 새로운 부작용을 일으킬 수 있습니다.

SingleObservable 특성은 Publisher 단일 항목만 반환하는 구현입니다. 일반 Observable 와 같은 방식으로 사용할 수 있습니다.

SubscriptionObservable 를 구독하는 Observer 의 일대일 라이프사이클을 나타냅니다. Subscription ~ Observable 는 단일 Observer 에서만 사용할 수 있습니다. Subscription 의 목적은 수요를 제어하고 Observable 의 구독 취소를 허용하는 것입니다.

ObserverObservable 로부터 푸시 기반 알림을 수신하기 위한 메커니즘을 제공합니다. 이러한 이벤트에 대한 수요는 Subscription 로 표시됩니다.

Observable[TResult] 을(를) 구독하면 Observer 이(가) onSubscribe(subscription: Subscription) 메서드를 통해 Subscription 으)로 전달됩니다. 결과에 대한 수요는 Subscription 를 통해 신호를 보내고 모든 결과는 onNext(result: TResult) 메서드로 전달됩니다. 어떤 이유로든 오류가 발생하면 onError(e: Throwable) 메서드가 호출되고 Observer 에 더 이상 이벤트가 전달되지 않습니다. 또는 ObserverObservable 의 모든 결과를 소비하면 onComplete() 메서드가 호출됩니다.

다음 예제에서는 Subscription 을(를) 사용하여 Observable 을(를) 반복할 때 수요를 제어합니다. 기본 Observer 구현은 모든 데이터를 자동으로 요청합니다. 아래에서는 Observable 의 수요 기반 반복을 관리할 수 있도록 onSubscribe() 메서드 사용자 지정을 재정의합니다.

collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})

org.mongodb.scala 패키지는 Publisher 유형과의 향상된 상호 작용을 제공합니다. 확장 기능에는 익명 함수를 통한 간단한 구독이 포함됩니다.

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))

org.mongodb.scala 패키지에는 Publisher 또는 Observable 인스턴스를 더 간단하게 연결하고 작업할 수 있도록 다음과 같은 모나딕 연산자도 제공하는 암시적 클래스가 포함되어 있습니다.

GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})

다음 목록에서는 사용 가능한 모나딕 연산자에 대해 설명합니다.

  • andThen: Observable 인스턴스의 체인을 허용합니다.

  • collect: 모든 결과를 시퀀스로 수집합니다.

  • fallbackTo: 오류가 있는 경우 대체 Observable 로 대체할 수 있습니다.

  • filter: Observable 의 결과를 필터링합니다.

  • flatMap: Observable 의 각 결과에 함수를 적용하여 새 Observable 을 생성합니다.

  • foldLeft: 적용된 축적자 함수의 단일 결과를 포함하는 새 Observable 를 만듭니다.

  • foreach: 방출된 각 결과에 적용된 함수를 적용합니다.

  • head: Future 에 있는 Observable 의 헤드를 반환합니다.

  • map: Observable 의 방출된 각 결과에 함수를 적용하여 새 Observable 을 생성합니다.

  • observeOn: 향후 작업을 위해 특정 ExecutionContext 를 사용하는 새 Observable 를 만듭니다.

  • recover: 이 Observable 에 다른 Observable 값을 할당하여 포함할 수 있는 일치하는 모든 스로어블을 처리할 새 Observable 를 만듭니다.

  • recoverWith: 이 Observable 에 포함될 수 있는 일치하는 모든 스로어블을 처리할 새 Observable 를 만듭니다.

  • toFuture:Observable 결과를 수집하여 로 Future 변환합니다.

  • transform: 방출된 각 결과에 resultFunction 함수를 적용하여 새 Observable 를 만듭니다.

  • withFilter: Observable 인스턴스에 대한 for-comprehensions 지원을 제공합니다.

  • zip: 이 값과 다른 Observable 값을 압축하고 결과의 튜플을 포함하는 새 Observable 를 만듭니다.

Boxedpublisher 보기 각 연산자에 대해 자세히 알아보려면 API 문서를 참조하세요.

SingleObservable[T] 는 단일 항목을 반환하므로 toFuture() 메서드는 헤드 메서드와 동일한 방식으로 Future[T] 를 반환합니다. PublisherSingleObservable 로 변환하는 암시적 변환기도 있습니다.

돌아가기

모니터링