Observables
Scala 驱动程序是异步非阻塞驱动程序。 通过实现 Observable
模型,异步事件变得简单、可组合的操作,并且摆脱了嵌套回调的复杂性。
对于异步操作,提供了三个接口:
注意
该驱动程序基于MongoDB Reactive Streams 驱动程序构建,是响应式流规范的实现。 Observable
是Publisher
的实现,而Observer
是Subscriber
的实现。
以下类命名规则适用:
Observable
:自定义实现Publisher
Observer
:自定义实现Subscriber
Subscription
可观察
是扩展的Observable
Publisher
实现,通常它表示 MongoDB 操作,该操作根据从Observer
到 的请求将其结果发送到Subscription
Observable
。
重要
Observable
可以视为偏函数。 与偏函数一样,在调用它们之前不会发生任何事情。 Observable
可以订阅多次,每次订阅都可能导致新的副作用,例如查询 MongoDB 或插入数据。
SingleObservable
SingleObservable 特征是仅返回单个项目的Publisher
实现。它的使用方式与普通Observable
相同。
订阅
Subscription
表示订阅Observable
的Observer
的一对一生命周期。 Subscription
的Observable
只能由单个Observer
使用。Subscription
的用途是控制需求并允许取消订阅Observable
。
观察者
Observer
提供了从Observable
接收基于推送的通知的机制。 对这些事件的需求由其Subscription
。
订阅Observable[TResult]
后,将通过onSubscribe(subscription:
Subscription)
方法向Observer
传递Subscription
。 对结果的需求通过Subscription
,任何结果都传递给onNext(result:
TResult)
方法。 如果由于任何原因出现错误,则会调用onError(e:
Throwable)
方法,并且不会再将事件传递到Observer
。 或者,当Observer
消耗完来自Observable
的所有结果时,将调用onComplete()
方法。
背压
在以下示例中, Subscription
用于在迭代Observable
时控制需求。 默认的Observer
实现会自动请求所有数据。 下面我们将重写onSubscribe()
自定义方法,以便管理Observable
的需求驱动迭代:
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
可观察助手
org.mongodb.scala
包改进了与Publisher
类型的交互。 扩展功能包括通过匿名函数进行简单订阅:
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala
包包含一个隐式类,该类还提供以下 Monadic 操作符,以使Publisher
或Observable
实例的链接和使用变得更简单:
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
以下列表描述了可用的 Monadic 操作符:
andThen
:允许链接Observable
实例。collect
:将所有结果收集到一个序列中。fallbackTo
:允许在出现故障时回退到备用Observable
。filter
:筛选Observable
的结果。flatMap
:通过对Observable
的每个结果应用一个函数来创建新的Observable
。foldLeft
:创建一个新的Observable
,其中包含应用的累加器函数的单个结果。foreach
:应用应用于每个发出结果的函数。head
:返回Future
中Observable
的头部。map
:通过将函数应用于Observable
的每个发出的结果来创建新的Observable
。observeOn
:创建一个新的Observable
,它将特定的ExecutionContext
用于将来的操作。recover
:创建一个新的Observable
,它将处理此Observable
可能包含的任何匹配的可抛出对象,方法是为其分配另一个Observable
的值。recoverWith
:创建一个新的Observable
,它将处理此Observable
可能包含的任何匹配的可抛出对象。toFuture
:收集Observable
结果并将其转换为Future
。transform
:通过将resultFunction
函数应用于每个发出的结果来创建新的Observable
。withFilter
:为Observable
实例提供 for-推导式支持。zip
:压缩此Observable
和另一个的值,并创建一个新的Observable
来保存其结果的元组。
请参阅 BoxedPublisher API文档以学习;了解有关每个操作符的更多信息。
SingleObservable
由于SingleObservable[T]
会返回单个项目,因此toFuture()
方法会以与 head 方法相同的方式返回Future[T]
。 还有一个隐式转换器,可将Publisher
转换为SingleObservable
。