Observables
Scala 드라이버는 비동기 및 비차단 드라이버입니다. Observable
모델을 구현하면 비동기 이벤트가 중첩된 콜백의 복잡성에서 벗어나 간단하고 구성 가능한 작업이 됩니다.
비동기 작업의 경우 세 가지 인터페이스가 있습니다.
참고
이 드라이버는 MongoDB Reactive Streams 드라이버 를 기반으로 구축되었으며, Reactive Streams 사양을 구현한 것입니다. Observable
은 Publisher
의 구현이고 Observer
는 Subscriber
의 구현입니다.
다음과 같은 클래스 명명 규칙이 적용됩니다.
Observable
: 사용자 지정 구현Publisher
Observer
: 사용자 지정 구현Subscriber
Subscription
관찰 가능
Observable
은(는) 확장된 Publisher
구현이며, 일반적으로 Subscription
에서 Observable
로의 요청에 따라 결과를 Observer
로 내보내는 MongoDB 작업을 나타냅니다.
중요
Observable
부분 함수로 생각할 수 있습니다. 부분 함수와 마찬가지로 호출될 때까지 아무 일도 발생하지 않습니다. Observable
는 여러 번 구독할 수 있으며, 각 구독은 MongoDB를 쿼리하거나 데이터를 삽입하는 등의 새로운 부작용을 일으킬 수 있습니다.
SingleObservable
SingleObservable 특성은 Publisher
단일 항목만 반환하는 구현입니다. 일반 Observable
와 같은 방식으로 사용할 수 있습니다.
서브스크립션
Subscription
는 Observable
를 구독하는 Observer
의 일대일 라이프사이클을 나타냅니다. Subscription
~ Observable
는 단일 Observer
에서만 사용할 수 있습니다. Subscription
의 목적은 수요를 제어하고 Observable
의 구독 취소를 허용하는 것입니다.
관찰자
Observer
는 Observable
로부터 푸시 기반 알림을 수신하기 위한 메커니즘을 제공합니다. 이러한 이벤트에 대한 수요는 Subscription
로 표시됩니다.
Observable[TResult]
을(를) 구독하면 Observer
이(가) onSubscribe(subscription:
Subscription)
메서드를 통해 Subscription
으)로 전달됩니다. 결과에 대한 수요는 Subscription
를 통해 신호를 보내고 모든 결과는 onNext(result:
TResult)
메서드로 전달됩니다. 어떤 이유로든 오류가 발생하면 onError(e:
Throwable)
메서드가 호출되고 Observer
에 더 이상 이벤트가 전달되지 않습니다. 또는 Observer
가 Observable
의 모든 결과를 소비하면 onComplete()
메서드가 호출됩니다.
배압
다음 예제에서는 Subscription
을(를) 사용하여 Observable
을(를) 반복할 때 수요를 제어합니다. 기본 Observer
구현은 모든 데이터를 자동으로 요청합니다. 아래에서는 Observable
의 수요 기반 반복을 관리할 수 있도록 onSubscribe()
메서드 사용자 지정을 재정의합니다.
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
관찰 가능한 헬퍼
The org.mongodb.scala
package provides improved interaction with Publisher
types. 확장 기능에는 익명 함수를 통한 간단한 구독 이 포함됩니다.
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala
패키지에는 Publisher
또는 Observable
인스턴스를 더 간단하게 연결하고 작업할 수 있도록 다음과 같은 모나딕 연산자도 제공하는 암시적 클래스가 포함되어 있습니다.
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
다음 목록에서는 사용 가능한 모나딕 연산자에 대해 설명합니다.
andThen
:Observable
인스턴스의 체인을 허용합니다.collect
: 모든 결과를 시퀀스로 수집합니다.fallbackTo
: 오류가 있는 경우 대체Observable
로 대체할 수 있습니다.filter
:Observable
의 결과를 필터링합니다.flatMap
:Observable
의 각 결과에 함수를 적용하여 새Observable
을 생성합니다.foldLeft
: 적용된 축적자 함수의 단일 결과를 포함하는 새Observable
를 만듭니다.foreach
: 방출된 각 결과에 적용된 함수를 적용합니다.head
:Future
에 있는Observable
의 헤드를 반환합니다.map
:Observable
의 방출된 각 결과에 함수를 적용하여 새Observable
을 생성합니다.observeOn
: 향후 작업을 위해 특정ExecutionContext
를 사용하는 새Observable
를 만듭니다.recover
: 이Observable
에 다른Observable
값을 할당하여 포함할 수 있는 일치하는 모든 스로어블을 처리할 새Observable
를 만듭니다.recoverWith
: 이Observable
에 포함될 수 있는 일치하는 모든 스로어블을 처리하다 할 새Observable
를 만듭니다.toFuture
:Observable
결과를 수집하여 로Future
변환합니다.transform
: 방출된 각 결과에resultFunction
함수를 적용하여 새Observable
를 만듭니다.withFilter
:Observable
인스턴스에 대한 for-comprehensions 지원을 제공합니다.zip
: 이 값과 다른Observable
값을 압축하고 결과의 튜플을 포함하는 새Observable
를 만듭니다.
Boxedpublisher 보기 각 연산자 에 학습 보려면 API 문서를 참조하세요.
SingleObservable
SingleObservable[T]
는 단일 항목을 반환하므로 toFuture()
메서드는 헤드 메서드와 동일한 방식으로 Future[T]
를 반환합니다. Publisher
을 SingleObservable
로 변환하는 암시적 변환기도 있습니다.