Docs Menu
Docs Home
/ / /
Java Reactive Streams Driver

Sample Custom Subscriber Implementations

On this page

  • Reactive Streams
  • Subscribers
  • Custom Subscriber Implementations
  • Blocking and Non-Blocking Examples
  • Publishers, Subscribers, and Subscriptions

This guide provides background about the Java Reactive Streams driver and its asynchronous API. The guide also lists and explains sample custom subscriber implementations.

Note

For instructions on how to install the driver, see the Get Started guide.

This library is an implementation of the reactive streams specification. The reactive stream API consists of the following components:

  1. Publisher

  2. Subscriber

  3. Subscription

A Publisher is a provider of a potentially unbounded number of sequenced elements, published according to the demand received from its Subscriber or multiple instances of Subscriber.

In response to a call to Publisher.subscribe(Subscriber), the possible invocation sequences for methods on the Subscriber class are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means that onSubscribe is always signaled, followed by a possibly unbounded number of onNext signals, as requested by Subscriber. This is followed by an onError signal if there is a failure or an onComplete signal when no more elements are available, as long as the Subscription is not canceled.

Tip

To learn more about reactive streams, visit the Reactive Streams documentation.

The Java Reactive Streams driver API mirrors the Java Sync driver API and any methods that cause network I/O to return a Publisher<T> type, where T is the type of response for the operation.

Note

All Publisher types returned from the API are cold, meaning that nothing happens until they are subscribed to. So just creating a Publisher won’t cause any network I/O. It’s not until you call the Publisher.subscribe() method that the driver executes the operation.

Publishers in this implementation are unicast. Each Subscription to a Publisher relates to a single MongoDB operation, and the Publisher instance's Subscriber receives its own specific set of results.

In the Java Reactive Streams documentation, we have implemented different Subscriber types. Although this is an artificial scenario for reactive streams, we do block on the results of one example before starting the next to ensure the state of the database. To see the source code for all the custom subscriber implementations, see SubscriberHelpers.java in the driver source code.

  • ObservableSubscriber
    The base subscriber class is the ObservableSubscriber<T>, a Subscriber that stores the results of the Publisher<T>. It also contains an await() method so we can block for results to ensure the state of the database before going on to the next example.
  • OperationSubscriber
    An implementation of the ObservableSubscriber that immediately calls Subscription.request() when it is subscribed to.
  • PrintSubscriber
    An implementation of the OperationSubscriber that prints a message when the Subscriber.onComplete() method is called.
  • ConsumerSubscriber
    An implementation of OperationSubscriber that takes a Consumer and calls Consumer.accept(result) when Subscriber.onNext(T result) is called.
  • PrintToStringSubscriber
    An implementation of ConsumerSubscriber that prints the string version of the result when the Subscriber.onNext() method is called.
  • PrintDocumentSubscriber
    An implementation of the ConsumerSubscriber that prints the JSON version of a Document type when the Subscriber.onNext() method is called.

As our Subscriber types contain a latch that is only released when the onComplete() method of the Subscriber is called, we can use that latch to block further actions by calling the await method. The following two examples use our auto-requesting PrintDocumentSubscriber.

The first is non-blocking and the second blocks waiting for the Publisher to complete:

// Create a publisher
Publisher<Document> publisher = collection.find();
// Non-blocking
publisher.subscribe(new PrintDocumentSubscriber());
Subscriber<Document> subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber);
subscriber.await(); // Block for the publisher to complete

In general, Publisher, Subscriber and Subscription types comprise a low level API and it’s expected that users and libraries will build more expressive APIs upon them rather than solely use these interfaces. As a library solely implementing these interfaces, users will benefit from this growing ecosystem, which is a core design principle of reactive streams.

Back

POJO CRUD Examples