カスタムサブスクライバーの実装例
このガイドでは、 Java Reactive Streams ドライバーとその非同期APIの背景を説明します。 このガイドでは、サンプルのカスタム サブスクライブの実装もリストして説明しています。
注意
ドライバーをインストールする方法については、「 を使い始める 」ガイドを参照してください。
Reactive Streams
このライブラリは、リアクティブ ストリーム仕様の実装です。 Reactive Stream API は、次のコンポーネントで構成されています。
Publisher
は、 Subscriber
またはSubscriber
の複数のインスタンスから受信した要求に従って公開される、潜在的に無制限の数のシーケンス化された要素のプロバイダーです。
Publisher.subscribe(Subscriber)
への呼び出しに応答して、 Subscriber
クラスのメソッドに可能な呼び出しシーケンスは、次のプロトコルによって提供されます。
onSubscribe onNext* (onError | onComplete)?
つまり、 onSubscribe
は常にシグナルられ、その後にSubscriber
からリクエストされた場合、上限の数のonNext
シグナルが続く可能性があります。 これには、障害が発生した場合はonError
シグナルが送信され、 Subscription
がキャンセルされていない限り、使用できる要素が ない 場合はonComplete
シグナルが送信されます。
サブスクリプション
Java Reactive Streams ドライバーAPIは、 Java Sync Driver APIとネットワーク I/O により Publisher<T>
タイプが返されるすべてのメソッドをミラーリングします。ここでは、T
は操作の応答のタイプです。
注意
API から返されるすべての 型はPublisher
コールド です 、 にサブスクライブされるまで何も発生しないということを意味します。したがって、 Publisher
を作成するだけでは、ネットワーク I/O は発生しません。 ドライバーによって操作が実行されるのは、 Publisher.subscribe()
メソッドを呼び出すまでです。
この実装ではパウンドは ユニット です。 Subscription
に対する各Publisher
は 1 つの MongoDB 操作に関連し、Publisher
インスタンスのSubscriber
には独自の結果セットが与えられます。
カスタム サブスクリプションの実装
Java Reactive Streams のドキュメントでは、さまざまなSubscriber
型を実装しました。 これはリアクティブなストリームの論理的なシナリオですが、データベースの状態を確保するために、次の例を開始する前に、ある例の結果をブロックします。 すべてのカスタム サブスクリプ ション実装のソースコードを確認するには、 ドライバーソースコード内の 。
ObservableSubscriber
- 基本 サブスク ライブ <T>
Subscriber
クラスは、 、 の結果を保存する 。Publisher<T>
また、await()
メソッドも含まれているため、次の例に進む前に結果をブロックしてデータベースの状態を確保できます。
OperationSubscriber
- サブスクライブ時にすぐに
Subscription.request()
を呼び出すObservableSubscriber
の実装。
PrintSubscriber
Subscriber.onComplete()
メソッドが呼び出されたときにメッセージを出力するOperationSubscriber
の実装。
ConsumerSubscriber
OperationSubscriber
Consumer
Consumer.accept(result)
Subscriber.onNext(T result)
を受け取り、 が呼び出されたときに呼び出す の実装。
PrintToStringSubscriber
Subscriber.onNext()
メソッドが呼び出されたときにresult
の string バージョンを出力するConsumerSubscriber
の実装。
PrintDocumentSubscriber
Subscriber.onNext()
メソッドの呼び出し時にDocument
タイプの JSON バージョンを出力するConsumerSubscriber
の実装。
ブロッキングと非ブロッキングの例
Subscriber
型には、 Subscriber
のonComplete()
メソッドが呼び出された場合にのみリリースされるラッチが含まれているため、 await
メソッドを呼び出すことで、そのラッチを使用してそれ以上のアクションをブロックできます。 次の 2 つの例では、自動リクエストPrintDocumentSubscriber
を使用します。
最初は非ブロッキングで、2 番目のブロックはPublisher
が完了するのを待機しています。
// Create a publisher Publisher<Document> publisher = collection.find(); // Non-blocking publisher.subscribe(new PrintDocumentSubscriber()); Subscriber<Document> subscriber = new PrintDocumentSubscriber(); publisher.subscribe(subscriber); subscriber.await(); // Block for the publisher to complete
出版社、サブスクリプション、サブスクリプション
一般に、 Publisher
、 Subscriber
、 Subscription
型は低レベル API を構成し、これらのインターフェースのみを使用するのではなく、ユーザーとライブラリがより表現性の高い API を構築することが予想されます。 これらのインターフェースのみを実装するライブラリとして、ユーザーはこの増加するエコシステムのメリットを得ます。これは、リアクティブなストリームの主要設計原則です。