Docs Menu
Docs Home
/ / /
Scala
/

Observables

On this page

  • Observable
  • SingleObservable
  • Subscription
  • Observer
  • Back Pressure
  • Observable Helpers
  • SingleObservable

The Scala driver is an asynchronous and non-blocking driver. By implementing the Observable model, asynchronous events become simple, composable operations that are free from the complexity of nested callbacks.

For asynchronous operations, there are three interfaces:

  • Observable

  • Subscription

  • Observer

Note

The driver is built upon the MongoDB Reactive Streams driver and is an implementation of the reactive streams specification. Observable is an implementation of Publisher and Observer is an implementation of Subscriber.

The following class naming conventions apply:

  • Observable: custom implementation of a Publisher

  • Observer: custom implementation of a Subscriber

  • Subscription

The Observable is an extended Publisher implementation, and in general it represents a MongoDB operation which emits its results to the Observer based on a request from the Subscription to the Observable.

Important

Observable can be thought of as a partial function. Like with partial functions, nothing happens until they are called. An Observable can be subscribed to multiple times, with each subscription potentially causing new side effects, such as querying MongoDB or inserting data.

The SingleObservable trait is a Publisher implementation that returns only a single item. It can be used in the same way as an ordinary Observable.

A Subscription represents a one-to-one lifecycle of an Observer subscribing to an Observable. A Subscription to an Observable can only be used by a single Observer. The purpose of a Subscription is to control demand and to allow unsubscribing from the Observable.

An Observer provides the mechanism for receiving push-based notifications from the Observable. Demand for these events is signaled by its Subscription.

Upon subscription to an Observable[TResult], the Observer will be passed the Subscription though the onSubscribe(subscription: Subscription) method. Demand for results is signaled through the Subscription and any results are passed to the onNext(result: TResult) method. If there is an error for any reason the onError(e: Throwable) method will be called and no more events are passed to the Observer. Alternatively, when the Observer has consumed all the results from the Observable, the onComplete() method will be called.

In the following example, the Subscription is used to control demand when iterating an Observable. The default Observer implementation automatically requests all the data. Below we override the onSubscribe() method custom so we can manage the demand-driven iteration of the Observable:

collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})

The org.mongodb.scala package provides improved interaction with Publisher types. The extended functionality includes simple subscription through anonymous functions:

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))

The org.mongodb.scala package includes an implicit class that also provides the following Monadic operators to make chaining and working with Publisher or Observable instances simpler:

GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})

The following list describes the available Monadic operators:

  • andThen: Allows the chaining of Observable instances.

  • collect: Collects all the results into a sequence.

  • fallbackTo: Allows falling back to an alternative Observable if there is a failure.

  • filter: Filters results of the Observable.

  • flatMap: Creates a new Observable by applying a function to each result of the Observable.

  • foldLeft: Creates a new Observable that contains the single result of the applied accumulator function.

  • foreach: Applies a function applied to each emitted result.

  • head: Returns the head of the Observable in a Future.

  • map: Creates a new Observable by applying a function to each emitted result of the Observable.

  • observeOn: Creates a new Observable that uses a specific ExecutionContext for future operations.

  • recover: Creates a new Observable that will handle any matching throwable that this Observable might contain by assigning it a value of another Observable.

  • recoverWith: Creates a new Observable that will handle any matching throwable that this Observable might contain.

  • toFuture: Collects the Observable results and converts them to a Future.

  • transform: Creates a new Observable by applying the resultFunction function to each emitted result.

  • withFilter: Provides for-comprehensions support to Observable instances.

  • zip: Zips the values of this and another Observable, and creates a new Observable holding the tuple of their results.

See the BoxedPublisher API documentation to learn more about each operator.

Because a SingleObservable[T] returns a single item, the toFuture() method returns a Future[T] in the same way as the head method does. There is also an implicit converter that converts a Publisher to a SingleObservable.

Back

Monitoring