Observables
On this page
The Scala driver is an asynchronous and non-blocking driver.
By implementing the Observable
model, asynchronous events become simple, composable
operations that are free from the complexity of nested callbacks.
For asynchronous operations, there are three interfaces:
Note
The driver is built upon the MongoDB Reactive Streams
driver
and is an implementation of the reactive streams specification.
Observable
is an implementation of Publisher
and Observer
is an implementation of Subscriber
.
The following class naming conventions apply:
Observable
: custom implementation of aPublisher
Observer
: custom implementation of aSubscriber
Subscription
Observable
The Observable
is an extended Publisher
implementation, and in general
it represents a MongoDB operation which emits its results to the Observer
based on a request from the Subscription
to the Observable
.
Important
Observable
can be thought of as a partial function. Like with partial
functions, nothing happens until they are called. An Observable
can be
subscribed to multiple times, with each subscription potentially
causing new side effects, such as querying MongoDB or inserting data.
SingleObservable
The SingleObservable trait is a
Publisher
implementation that returns only a single item. It can be
used in the same way as an ordinary Observable
.
Subscription
A Subscription
represents a one-to-one lifecycle of an Observer
subscribing to an Observable
. A Subscription
to an Observable
can only
be used by a single Observer
. The purpose of a Subscription
is to
control demand and to allow unsubscribing from the Observable
.
Observer
An Observer
provides the mechanism for receiving push-based
notifications from the Observable
. Demand for these events is signaled
by its Subscription
.
Upon subscription to an Observable[TResult]
, the Observer
will be passed
the Subscription
though the onSubscribe(subscription:
Subscription)
method. Demand for results is signaled through the
Subscription
and any results are passed to the onNext(result:
TResult)
method. If there is an error for any reason the onError(e:
Throwable)
method will be called and no more events
are passed to the Observer
. Alternatively, when the Observer
has consumed
all the results from the Observable
, the onComplete()
method will be
called.
Back Pressure
In the following example, the Subscription
is used to control demand
when iterating an Observable
. The default Observer
implementation
automatically requests all the data. Below we override the onSubscribe()
method custom so we can manage the demand-driven iteration of the
Observable
:
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
Observable Helpers
The org.mongodb.scala
package provides improved interaction with
Publisher
types. The extended functionality includes simple subscription through
anonymous functions:
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
The org.mongodb.scala
package includes an implicit class that also provides the
following Monadic operators to make chaining and working with Publisher
or Observable
instances simpler:
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
The following list describes the available Monadic operators:
andThen
: Allows the chaining ofObservable
instances.collect
: Collects all the results into a sequence.fallbackTo
: Allows falling back to an alternativeObservable
if there is a failure.filter
: Filters results of theObservable
.flatMap
: Creates a newObservable
by applying a function to each result of theObservable
.foldLeft
: Creates a newObservable
that contains the single result of the applied accumulator function.foreach
: Applies a function applied to each emitted result.head
: Returns the head of theObservable
in aFuture
.map
: Creates a newObservable
by applying a function to each emitted result of theObservable
.observeOn
: Creates a newObservable
that uses a specificExecutionContext
for future operations.recover
: Creates a newObservable
that will handle any matching throwable that thisObservable
might contain by assigning it a value of anotherObservable
.recoverWith
: Creates a newObservable
that will handle any matching throwable that thisObservable
might contain.toFuture
: Collects theObservable
results and converts them to aFuture
.transform
: Creates a newObservable
by applying theresultFunction
function to each emitted result.withFilter
: Provides for-comprehensions support toObservable
instances.zip
: Zips the values of this and anotherObservable
, and creates a newObservable
holding the tuple of their results.
See the BoxedPublisher API documentation to learn more about each operator.
SingleObservable
Because a SingleObservable[T]
returns a single item, the
toFuture()
method returns a Future[T]
in the same way as the
head method does. There is also an implicit converter that converts a
Publisher
to a SingleObservable
.