Docs Menu
Docs Home
/ / /
Scala
/

Observables

項目一覧

  • Observable
  • SingleObservable
  • サブスクリプション
  • 観察者
  • バックプレッシャー
  • 観察可能なヘルパー
  • SingleObservable

Scala ドライバーは 、非同期の非ブロッキング ドライバーです。 Observableモデルを実装することで、非同期イベントは単純で構成可能な操作になり、ネストされたコールバックの複雑さはありません。

非同期操作の場合、次の 3 つのインターフェースがあります。

  • Observable

  • サブスクリプション

  • 観察者

注意

ドライバーはMongoDB Reactive Streams ドライバー上にビルドされ、リアクティブ ストリーム仕様の実装です。 ObservablePublisherの実装であり、 ObserverSubscriberの実装です。

次のクラス命名規則が適用されます。

  • Observable: のカスタム実装 Publisher

  • Observer: のカスタム実装 Subscriber

  • Subscription

Observableは拡張Publisherの実装であり、一般に、 SubscriptionからObservableへのリクエストに基づいてObserverに結果を出力する MongoDB 操作を表します。

重要

Observable は、部分的な関数と考えることができます。 部分的な関数と同様に、呼び出されるまで何も発生しません。 Observableは複数回サブスクライブできます。サブスクライブごとに、MongoDB のクエリやデータの挿入など、新しい副作用が発生する可能性があります。

The SingleObservable trait は、1Publisher つのアイテムのみを返す の実装です。通常のObservableと同じ方法で使用できます。

Subscriptionは、 ObserverObservableにサブスクライブする 1 対 1 のライフサイクルを表します。 Observable Subscriptionは、単一のObserverでのみ使用できます。 Subscriptionの目的は、需要を制御し、 Observableからのサブスクリプションを許可することです。

Observerは、 Observableからプッシュベースの通知を受信するメカニズムを提供します。 これらのイベントの需要は、 Subscriptionによって示されます。

Observable[TResult]にサブスクライブすると、 ObserveronSubscribe(subscription: Subscription)メソッドを介してSubscriptionに渡されます。 結果の要求はSubscriptionを介してシグナルされ、すべての結果はonNext(result: TResult)メソッドに渡されます。 何らかの理由でエラーが発生した場合、 onError(e: Throwable)メソッドが呼び出され、それ以上のイベントがObserverに渡されません。 あるいは、 ObserverObservableからの結果をすべて消費した場合、 onComplete()メソッドが呼び出されます。

次の例では、 Observableを反復処理するときに需要を制御するためにSubscriptionが使用されています。 デフォルトのObserver実装はすべてのデータを自動的にリクエストします。 以下では、 onSubscribe()メソッドのカスタムをオーバーライドして、 Observableの要求に応じた反復を管理できるようにします。

collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})

org.mongodb.scalaパッケージは、 Publisher型との相互作用を改善します。 拡張機能には、匿名関数による単純なサブスクライブが含まれます。

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))

org.mongodb.scalaパッケージには暗黙的な クラスが含まれています。このクラスでは、 PublisherまたはObservableインスタンスのチェーンと操作を簡素化するための次の MongoDB 演算子も提供されています。

GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})

次のリストでは、使用可能な MongoDB 演算子を説明しています。

  • andThen: Observableインスタンスの連鎖を許可します。

  • collect: すべての結果をシーケンスに収集します。

  • fallbackTo: 障害が発生した場合に代替のObservableにフォールバックできるようにします。

  • filter: Observableの結果をフィルタリングします。

  • flatMap: Observableの各結果に関数を適用して新しいObservableを作成します。

  • foldLeft: 適用されたアキュムレータ関数の単一の結果を含む新しいObservableを作成します。

  • foreach: 出力された結果それぞれに適用される関数を適用します。

  • head: 内のObservable Futureのヘッドを返します。

  • map: Observableのそれぞれの出力結果に関数を適用して、新しいObservableを作成します。

  • observeOn: 将来の操作に特定のExecutionContextを使用する新しいObservableを作成します。

  • recover: 別のObservableの値を割り当てて、このObservableに含まれる可能性のある一致する例外を処理する新しいObservableを作成します。

  • recoverWith: このObservableに含まれる可能性のある一致するスローオブジェクトを処理する新しいObservableを作成します。

  • toFuture: Observableの結果を収集し、 Futureに変換します。

  • transform: 出力された結果ごとにresultFunction関数を適用して、新しいObservableを作成します。

  • withFilter: Observableインスタンスに大文字と小文字のサポートを提供します。

  • zip: この と別のObservableの値を圧縮し、その結果の 1 倍を保持する新しいObservableを作成します。

BoxedPublisher を参照してください 各演算子の詳細については、 API ドキュメント を参照してください。

SingleObservable[T]は 1 つのアイテムを返すため、 toFuture()メソッドは ヘッド メソッドと同じ方法でFuture[T]を返します。 また、 PublisherSingleObservableに変換する暗黙的な変換機能もあります。

戻る

モニタリング