Sample Custom Subscriber Implementations
On this page
This guide provides background about the Java Reactive Streams driver and its asynchronous API. The guide also lists and explains sample custom subscriber implementations.
Note
For instructions on how to install the driver, see the Get Started guide.
Reactive Streams
This library is an implementation of the reactive streams specification. The reactive stream API consists of the following components:
A Publisher
is a provider of a potentially
unbounded number of sequenced elements, published
according to the demand received from its Subscriber
or multiple
instances of Subscriber
.
In response to a call to Publisher.subscribe(Subscriber)
, the
possible invocation sequences for methods on the Subscriber
class
are given by the following protocol:
onSubscribe onNext* (onError | onComplete)?
This means that onSubscribe
is always
signaled, followed by a possibly unbounded number
of onNext
signals, as requested by
Subscriber
. This is followed by an onError
signal
if there is a failure or an onComplete
signal
when no more elements are available, as long as
the Subscription
is not canceled.
Tip
To learn more about reactive streams, visit the Reactive Streams documentation.
Subscribers
The Java Reactive Streams driver API mirrors the Java Sync driver API and any methods that cause network
I/O to return a Publisher<T>
type, where T
is the type of response
for the operation.
Note
All Publisher
types returned from the API are
cold,
meaning that nothing happens until they are subscribed to. So just
creating a Publisher
won’t cause any network I/O. It’s not until
you call the Publisher.subscribe()
method that the driver executes
the operation.
Publishers in this implementation are unicast. Each
Subscription
to a Publisher
relates to a single MongoDB
operation, and the Publisher
instance's Subscriber
receives its
own specific set of results.
Custom Subscriber Implementations
In the Java Reactive Streams documentation, we have implemented different Subscriber
types. Although this is an
artificial scenario for reactive streams, we do
block on the results of one example before starting
the next to ensure the state of the database. To see the source code for all
the custom subscriber implementations, see SubscriberHelpers.java
in the driver source code.
ObservableSubscriber
- The base subscriber class is the
ObservableSubscriber<T>,
a
Subscriber
that stores the results of thePublisher<T>
. It also contains anawait()
method so we can block for results to ensure the state of the database before going on to the next example.
OperationSubscriber
- An implementation of the
ObservableSubscriber
that immediately callsSubscription.request()
when it is subscribed to.
PrintSubscriber
- An implementation of the
OperationSubscriber
that prints a message when theSubscriber.onComplete()
method is called.
ConsumerSubscriber
- An implementation of
OperationSubscriber
that takes aConsumer
and callsConsumer.accept(result)
whenSubscriber.onNext(T result)
is called.
PrintToStringSubscriber
- An implementation of
ConsumerSubscriber
that prints the string version of theresult
when theSubscriber.onNext()
method is called.
PrintDocumentSubscriber
- An implementation of the
ConsumerSubscriber
that prints the JSON version of aDocument
type when theSubscriber.onNext()
method is called.
Blocking and Non-Blocking Examples
As our Subscriber
types contain a latch that is only
released when the onComplete()
method of the
Subscriber
is called, we can use that latch
to block further actions by calling the await
method.
The following two examples use our auto-requesting
PrintDocumentSubscriber
.
The first is non-blocking and the second blocks
waiting for the Publisher
to complete:
// Create a publisher Publisher<Document> publisher = collection.find(); // Non-blocking publisher.subscribe(new PrintDocumentSubscriber()); Subscriber<Document> subscriber = new PrintDocumentSubscriber(); publisher.subscribe(subscriber); subscriber.await(); // Block for the publisher to complete
Publishers, Subscribers, and Subscriptions
In general, Publisher
, Subscriber
and
Subscription
types comprise a low level API and it’s
expected that users and libraries will build more
expressive APIs upon them rather than solely use
these interfaces. As a library solely implementing
these interfaces, users will benefit from this
growing ecosystem, which is a core design principle
of reactive streams.