Observables
Scala ドライバーは 、非同期の非ブロッキング ドライバーです。 Observable
モデルを実装することで、非同期イベントは単純で構成可能な操作になり、ネストされたコールバックの複雑さはありません。
非同期操作の場合、次の 3 つのインターフェースがあります。
注意
ドライバーはMongoDB Reactive Streams ドライバー上にビルドされ、リアクティブ ストリーム仕様の実装です。 Observable
はPublisher
の実装であり、 Observer
はSubscriber
の実装です。
次のクラス命名規則が適用されます。
Observable
: のカスタム実装Publisher
Observer
: のカスタム実装Subscriber
Subscription
Observable
Observable
は拡張Publisher
の実装であり、一般に、 Subscription
からObservable
へのリクエストに基づいてObserver
に結果を出力する MongoDB 操作を表します。
重要
Observable
は、部分的な関数と考えることができます。 部分的な関数と同様に、呼び出されるまで何も発生しません。 Observable
は複数回サブスクライブできます。サブスクライブごとに、MongoDB のクエリやデータの挿入など、新しい副作用が発生する可能性があります。
SingleObservable
The SingleObservable trait は、1Publisher
つのアイテムのみを返す の実装です。通常のObservable
と同じ方法で使用できます。
サブスクリプション
Subscription
は、 Observer
がObservable
にサブスクライブする 1 対 1 のライフサイクルを表します。 Observable
Subscription
は、単一のObserver
でのみ使用できます。 Subscription
の目的は、需要を制御し、 Observable
からのサブスクリプションを許可することです。
観察者
Observer
は、 Observable
からプッシュベースの通知を受信するメカニズムを提供します。 これらのイベントの需要は、 Subscription
によって示されます。
Observable[TResult]
にサブスクライブすると、 Observer
はonSubscribe(subscription:
Subscription)
メソッドを介してSubscription
に渡されます。 結果の要求はSubscription
を介してシグナルされ、すべての結果はonNext(result:
TResult)
メソッドに渡されます。 何らかの理由でエラーが発生した場合、 onError(e:
Throwable)
メソッドが呼び出され、それ以上のイベントがObserver
に渡されません。 あるいは、 Observer
がObservable
からの結果をすべて消費した場合、 onComplete()
メソッドが呼び出されます。
バックプレッシャー
次の例では、 Observable
を反復処理するときに需要を制御するためにSubscription
が使用されています。 デフォルトのObserver
実装はすべてのデータを自動的にリクエストします。 以下では、 onSubscribe()
メソッドのカスタムをオーバーライドして、 Observable
の要求に応じた反復を管理できるようにします。
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
観察可能なヘルパー
org.mongodb.scala
パッケージは、 Publisher
型との相互作用を改善します。 拡張機能には、匿名関数による単純なサブスクライブが含まれます。
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala
パッケージには暗黙的な クラスが含まれています。このクラスでは、 Publisher
またはObservable
インスタンスのチェーンと操作を簡素化するための次の MongoDB 演算子も提供されています。
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
次のリストでは、使用可能な MongoDB 演算子を説明しています。
andThen
:Observable
インスタンスの連鎖を許可します。collect
: すべての結果をシーケンスに収集します。fallbackTo
: 障害が発生した場合に代替のObservable
にフォールバックできるようにします。filter
:Observable
の結果をフィルタリングします。flatMap
:Observable
の各結果に関数を適用して新しいObservable
を作成します。foldLeft
: 適用されたアキュムレータ関数の単一の結果を含む新しいObservable
を作成します。foreach
: 出力された結果それぞれに適用される関数を適用します。head
: 内のObservable
Future
のヘッドを返します。map
:Observable
のそれぞれの出力結果に関数を適用して、新しいObservable
を作成します。observeOn
: 将来の操作に特定のExecutionContext
を使用する新しいObservable
を作成します。recover
: 別のObservable
の値を割り当てて、このObservable
に含まれる可能性のある一致する例外を処理する新しいObservable
を作成します。recoverWith
: このObservable
に含まれる可能性のある一致するスローオブジェクトを処理する新しいObservable
を作成します。toFuture
:Observable
の結果を収集し、Future
に変換します。transform
: 出力された結果ごとにresultFunction
関数を適用して、新しいObservable
を作成します。withFilter
:Observable
インスタンスに大文字と小文字のサポートを提供します。zip
: この と別のObservable
の値を圧縮し、その結果の 1 倍を保持する新しいObservable
を作成します。
BoxedPublisher を参照してください 各演算子の詳細については、 API ドキュメント を参照してください。
SingleObservable
SingleObservable[T]
は 1 つのアイテムを返すため、 toFuture()
メソッドは ヘッド メソッドと同じ方法でFuture[T]
を返します。 また、 Publisher
をSingleObservable
に変換する暗黙的な変換機能もあります。