Docs Menu

Observable からデータにアクセスする

このガイドでは、ObservableインスタンスからMongoDB操作の結果にアクセスする方法を学びます。

Observable は、一定期間にわたる操作によって出力されるデータのストリームを表します。 このデータにアクセスするには、Observable をサブスクライブする Observerインスタンスを作成します。

注意

Scalaドライバーは、 MongoDB Java Reactive Streams ドライバー上に構築されています。 ObservableクラスはJava Reactive Streams の Publisherクラスを拡張し、結果処理に役立つ便利なメソッドが追加されています。

MongoDB操作を実行してそのデータを処理するには、Observable から操作結果をリクエスト必要があります。ドライバーは、任意の数の結果を返す操作用の Observable インターフェースを提供します。結果を生成しないか、結果を 1 つ生成する操作(findOne() メソッドなど)では、SingleObservable[T] が返されます。[T]化パラメータは、SingleObservable が取り扱うデータ型に対応します。

無制限の数の結果を生成できる操作では、Observable[T] 型のインスタンスが返されます。 一部の操作では特定の Observable タイプが返され、サブスクライブ前に結果をフィルタリングして処理するための追加メソッドが提供されます。 次のリストでは、操作固有のメソッドを Observable に連鎖させることができるいくつかのタイプについて説明します。

  • FindObservable[T]: find() メソッドによって返される

  • DistinctObservable[T]: distinct() メソッドによって返される

  • AggregateObservable[T]: aggregate() メソッドによって返される

操作の Observablesubscribe() メソッドを呼び出すことで、操作結果をリクエストできます。Observerクラスのインスタンスを subscribe() メソッドのパラメーターとして渡します。この Observer は、Observable から操作結果を受け取ります。その後、Observerクラスが提供するメソッドを使用して、結果を出力したり、エラーを処理したり、追加のデータ処理を実行したりできます。

結果の処理の詳細については、次のAPIドキュメントを参照してください。

restaurantssample_restaurantsこのガイドの例では、Atlasサンプルデータセット の データベースの コレクションを使用します。Scalaアプリケーションからこのコレクションにアクセスするには、Atlas クラスターに接続する MongoClient を作成し、database 変数と collection 変数に次の値を割り当てます。

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、 「Atlas を使い始める」ガイドを参照してください。

Observable[T] にサブスクライブした後、Observerクラスが提供する次のコールバックメソッドを使用して、操作結果にアクセスしたり、エラーを処理したりできます。

  • onNext(result: TResult): Observer が新しい結果を受け取ったときに呼び出されます。 このメソッドをオーバーライドすることで、結果を処理するロジックを定義できます。

  • onError(e: Throwable):操作エラーが生成され、ObserverObservable からそれ以上のデータを受信できないようにされた場合に呼び出されます。 このメソッドをオーバーライドすることで、エラー処理ロジックを定義できます。

  • onComplete(): ObserverObservable からの結果をすべて消費したときに呼び出されます。このメソッドをオーバーライドすることで、最終データ処理を実行できます。

次のセクションでは、これらのメソッドをカスタマイズして、Observable からの読み取りおよび書込み操作の結果を処理する方法を示します。

読み取り操作によって取得されたデータにアクセスするには、操作結果を保存するための Observable[T] を作成します。 次に、 Observable をサブスクライブし、Observerクラスのコールバックメソッドをオーバーライドして結果を処理します。

この例では、cuisine 値が "Czech" であるドキュメントを restaurantsコレクションでクエリします。結果を検索して処理するために、この例では操作に Observable[Document] を割り当て、次のアクションを実行します。

  • subscribe() メソッドを呼び出して Observable にサブスクライブし、Observer をパラメータとして渡します

  • onNext() メソッドをオーバーライドして、検索された各ドキュメント(Document インスタンスである)を出力します

  • エラーを出力するために onError() メソッドをオーバーライドします

  • Observable からのすべての結果が処理された後にメッセージを出力するように onComplete() メソッドをオーバーライドします

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

書込み操作によって検索されたデータにアクセスするには、操作結果を保存するための Observable[T] を作成します。 次に、 Observable をサブスクライブし、Observerクラスのコールバックメソッドをオーバーライドして結果を処理します。

この例では、 cuisine 値が "Nepalese" であるドキュメントを restaurantsコレクションに挿入します。 結果を検索して処理するために、この例では操作に Observable[InsertManyResult] を割り当て、次のアクションを実行します。

  • subscribe() メソッドを呼び出して Observable にサブスクライブし、Observer をパラメータとして渡します

  • onNext() メソッドをオーバーライドして、挿入操作の結果を InsertManyResult として返します

  • エラーを出力するために onError() メソッドをオーバーライドします

  • Observable からのすべての結果が処理された後にメッセージを出力するように onComplete() メソッドをオーバーライドします

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

Observerクラスのコールバック関数を明示的にオーバーライドする代わりに、Lambda 関数を使用して操作結果を簡潔に処理できます。 これらの関数を使用すると、=> 矢印表記を使用して、onNext()onError()onComplete() の実装をカスタマイズできます。

Tip

匿名関数とも呼ばれる Lambda 関数について詳しくは、Wikipedia で匿名関数のエントリを参照してください。

この例では、 restaurantsコレクションを boroughフィールドの個別の値ごとにクエリします。 このコードは distinct() メソッドによって返された Observable をサブスクライブし、Lambda 関数を使用して結果を出力し、エラーを処理します。

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

toFuture() メソッドを呼び出すと、Observable を暗黙的にサブスクライブし、その結果を集計できます。 ObservabletoFuture() を呼び出すと、ドライバーは次のアクションを実行します。

  • Observable にサブスクライブします

  • Observable が発行したアイテムを収集し、Futureインスタンスに保存します

次に、Future を反復処理して操作結果を検索できます。

重要

Observable に多数のドキュメントが含まれている場合、toFuture() メソッドを呼び出すと大量のメモリが消費されます。大規模な結果セットが予想される場合は、コールバックまたはLambda関数を使用して結果にアクセスすることを検討してください。

この例では、 nameフィールドの値が "The Halal Guys" であるドキュメントを restaurantsコレクションでクエリします。 操作結果にアクセスするために、コードは ObservableFuture に変換し、Future が各結果を収集するまで待機して、結果を反復処理します。

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。