Docs 菜单
Docs 主页
/ / /
Scala
/

Observables

在此页面上

  • 可观察
  • SingleObservable
  • 订阅
  • 观察者
  • 背压
  • 可观察助手
  • SingleObservable

Scala 驱动程序是异步非阻塞驱动程序。 通过实现 Observable模型,异步事件变得简单、可组合的操作,并且摆脱了嵌套回调的复杂性。

对于异步操作,提供了三个接口:

  • 可观察

  • 订阅

  • 观察者

注意

该驱动程序基于MongoDB Reactive Streams 驱动程序构建,是响应式流规范的实现。 ObservablePublisher的实现,而ObserverSubscriber的实现。

以下类命名规则适用:

  • Observable:自定义实现 Publisher

  • Observer:自定义实现 Subscriber

  • Subscription

是扩展的Observable Publisher实现,通常它表示 MongoDB 操作,该操作根据从Observer 到 的请求将其结果发送到Subscription Observable

重要

Observable 可以视为偏函数。 与偏函数一样,在调用它们之前不会发生任何事情。 Observable可以订阅多次,每次订阅都可能导致新的副作用,例如查询 MongoDB 或插入数据。

SingleObservable 特征是仅返回单个项目的Publisher 实现。它的使用方式与普通Observable相同。

Subscription表示订阅ObservableObserver的一对一生命周期。 SubscriptionObservable 只能由单个Observer 使用。Subscription的用途是控制需求并允许取消订阅Observable

Observer提供了从Observable接收基于推送的通知的机制。 对这些事件的需求由其Subscription

订阅Observable[TResult]后,将通过onSubscribe(subscription: Subscription)方法向Observer传递Subscription 。 对结果的需求通过Subscription ,任何结果都传递给onNext(result: TResult)方法。 如果由于任何原因出现错误,则会调用onError(e: Throwable)方法,并且不会再将事件传递到Observer 。 或者,当Observer消耗完来自Observable的所有结果时,将调用onComplete()方法。

在以下示例中, Subscription用于在迭代Observable时控制需求。 默认的Observer实现会自动请求所有数据。 下面我们将重写onSubscribe()自定义方法,以便管理Observable的需求驱动迭代:

collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})

org.mongodb.scala包改进了与Publisher类型的交互。 扩展功能包括通过匿名函数进行简单订阅:

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))

org.mongodb.scala包包含一个隐式类,该类还提供以下 Monadic 操作符,以使PublisherObservable实例的链接和使用变得更简单:

GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})

以下列表描述了可用的 Monadic 操作符:

  • andThen:允许链接Observable实例。

  • collect:将所有结果收集到一个序列中。

  • fallbackTo:允许在出现故障时回退到备用Observable

  • filter:筛选Observable的结果。

  • flatMap:通过对Observable 的每个结果应用一个函数来创建新的Observable

  • foldLeft:创建一个新的Observable ,其中包含应用的累加器函数的单个结果。

  • foreach:应用应用于每个发出结果的函数。

  • head:返回FutureObservable的头部。

  • map:通过将函数应用于Observable 的每个发出的结果来创建新的Observable

  • observeOn:创建一个新的Observable ,它将特定的ExecutionContext用于将来的操作。

  • recover:创建一个新的Observable ,它将处理此Observable可能包含的任何匹配的可抛出对象,方法是为其分配另一个Observable的值。

  • recoverWith:创建一个新的Observable ,它将处理此Observable可能包含的任何匹配的可抛出对象。

  • toFuture:收集Observable结果并将其转换为Future

  • transform:通过将resultFunction函数应用于每个发出的结果来创建新的Observable

  • withFilter:为Observable实例提供 for-推导式支持。

  • zip:压缩此 Observable 和另一个的值,并创建一个新的Observable来保存其结果的元组。

请参阅 BoxedPublisher API文档以学习;了解有关每个操作符的更多信息。

由于SingleObservable[T]会返回单个项目,因此toFuture()方法会以与 head 方法相同的方式返回Future[T] 。 还有一个隐式转换器,可将Publisher转换为SingleObservable

后退

监控