Docs 菜单
Docs 主页
/ / /
Java Reactive Streams 驱动程序

自定义订阅者实施示例

在此页面上

  • 反应流
  • 订阅者
  • 自定义订阅者实施
  • 阻塞和非阻塞示例
  • 发布者、订阅者和订阅

本指南提供有关Java Reactive Streams驾驶员及其异步API的背景。 本指南还列出并解释了示例自定义订阅者实现。

注意

有关如何安装驾驶员的说明,请参阅入门指南。

该库是响应式流规范的实现。 响应式流 API 由以下组件组成:

  1. 发布者

  2. 订阅者

  3. 订阅

Publisher是数量可能不受限制的排序元素的提供者,根据从其SubscriberSubscriber的多个实例收到的需求进行发布。

为响应对Publisher.subscribe(Subscriber)的调用, Subscriber类上的方法的可能调用序列由以下协议给出:

onSubscribe onNext* (onError | onComplete)?

这意味着onSubscribe始终会发出信号,然后按照Subscriber的请求,可能有无限数量的onNext信号。 只要Subscription未取消,如果出现故障,则后跟一个onError信号,或者当没有更多元素可用时,后跟一个onComplete信号。

提示

要了解有关响应式流的更多信息,请访问响应 式流文档。

Java Reactive Streams 驱动程序 API 镜像Java Sync 驱动程序API 以及导致网络 I/O 返回Publisher<T>类型的任何方法,其中T是操作的响应类型。

注意

从 API 返回的所有 类型都是Publisher 冷 类型 ,这意味着在订阅之前什么都不会发生。因此,仅创建Publisher不会导致任何网络 I/O。 直到您调用Publisher.subscribe()方法,驱动程序才会执行操作。

此实现中的发布者是单播的。 Publisher的每个Subscription都与一个 MongoDB 操作相关, Publisher实例的Subscriber会接收自己的一组特定结果。

在Java Reactive Streams 文档中,我们实现了不同的Subscriber类型。 尽管这是响应式流的人为场景,但在开始下一个示例之前,我们确实会区块一个示例的结果,以确保数据库的状态。 要查看所有自定义订阅者实现的源代码,请参阅 SubscriberHelpers.java 在驾驶员源代码中。

  • ObservableSubscriber
    基本订阅者类是 ObservableSubscriber<T> ,一个存储SubscriberPublisher<T>它还包含一个await()方法,因此我们可以阻止结果,以在继续下一个示例之前确保数据库的状态。
  • OperationSubscriber
    订阅后立即调用Subscription.request()ObservableSubscriber的实现。
  • PrintSubscriber
    OperationSubscriber的实现,用于在调用Subscriber.onComplete()方法时打印一条消息。
  • ConsumerSubscriber
    的实现,它接受 并在调用 时调用OperationSubscriberConsumerConsumer.accept(result)Subscriber.onNext(T result)
  • PrintToStringSubscriber
    ConsumerSubscriber 的实现,用于在调用 Subscriber.onNext() 方法时打印 result 的string版本。
  • PrintDocumentSubscriber
    ConsumerSubscriber的实现,用于在调用Subscriber.onNext()方法时打印Document类型的 JSON 版本。

由于我们的Subscriber类型包含一个锁存器,该锁存器仅在调用SubscriberonComplete()方法时才会释放,因此我们可以通过调用await方法使用该锁存器来阻止进一步的动作。 以下两个示例使用我们的自动请求PrintDocumentSubscriber

第一个是非阻塞的,第二个是阻塞的,等待Publisher完成:

// Create a publisher
Publisher<Document> publisher = collection.find();
// Non-blocking
publisher.subscribe(new PrintDocumentSubscriber());
Subscriber<Document> subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber);
subscriber.await(); // Block for the publisher to complete

一般来说, PublisherSubscriberSubscription类型由低级 API 组成,预计用户和库将基于它们构建更具表现力的 API,而不是仅仅使用这些接口。 作为一个单独实现这些接口的库,用户将受益于这个不断发展的生态系统,这是响应式流的核心设计原则。

后退

POJO 增删改查示例