Observable からデータにアクセスする
項目一覧
Overview
このガイドでは、Observable
インスタンスからMongoDB操作の結果にアクセスする方法を学びます。
Observable
は、一定期間にわたる操作によって出力されるデータのストリームを表します。このデータにアクセスするには、Observable
をサブスクライブする Observer
インスタンスを作成します。
注意
Scalaドライバーは、 MongoDB Java Reactive Streams ドライバー上に構築されています。 Observable
クラスはJava Reactive Streams の Publisher
クラスを拡張し、結果処理に役立つ便利なメソッドが追加されています。
観察可能な値を処理する方法
MongoDB操作を実行してそのデータを処理するには、Observable
から操作結果をリクエスト必要があります。ドライバーは、任意の数の結果を返す操作用の Observable
インターフェースを提供します。結果を生成しないか、結果を 1 つ生成する操作(findOne()
メソッドなど)では、SingleObservable[T]
が返されます。 [T]
化パラメータは、SingleObservable
が取り扱うデータ型に対応します。
無制限の数の結果を生成できる操作では、Observable[T]
型のインスタンスが返されます。一部の操作では特定の Observable
タイプが返され、サブスクライブ前に結果をフィルタリングして処理するための追加メソッドが提供されます。次のリストでは、操作固有のメソッドを Observable
に連鎖させることができるいくつかのタイプについて説明します。
FindObservable[T]
:find()
メソッドによって返されるDistinctObservable[T]
:distinct()
メソッドによって返されるAggregateObservable[T]
:aggregate()
メソッドによって返される
操作の Observable
で subscribe()
メソッドを呼び出すことで、操作結果をリクエストできます。 Observer
クラスのインスタンスを subscribe()
メソッドのパラメーターとして渡します。この Observer
は、Observable
から操作結果を受け取ります。その後、Observer
クラスが提供するメソッドを使用して、結果を出力したり、エラーを処理したり、追加のデータ処理を実行したりできます。
結果の処理の詳細については、次のAPIドキュメントを参照してください。
サンプル データ
restaurants
sample_restaurants
このガイドの例では、Atlasサンプルデータセット の データベースの コレクションを使用します。 Scalaアプリケーションからこのコレクションにアクセスするには、AtlasMongoClient
クラスターに接続する を作成し、 変数と 変数に次の値を割り当てます。database
collection
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、 「Atlas を使い始める」ガイドを参照してください。
コールバックを使用した結果の処理
Observable[T]
にサブスクライブした後、Observer
クラスが提供する次のコールバックメソッドを使用して、操作結果にアクセスしたり、エラーを処理したりできます。
onNext(result: TResult)
:Observer
が新しい結果を受け取ったときに呼び出されます。このメソッドをオーバーライドすることで、結果を処理するロジックを定義できます。onError(e: Throwable)
:操作によってエラーが生成され、Observer
がObservable
からそれ以上のデータを受信できないようにされた場合に呼び出されます。このメソッドをオーバーライドすることで、エラー処理ロジックを定義できます。onComplete()
:Observer
がObservable
からの結果をすべて消費したときに呼び出されます。このメソッドをオーバーライドすることで、最終データ処理を実行できます。
次のセクションでは、これらのメソッドをカスタマイズして、Observable
からの読み取りおよび書込み操作の結果を処理する方法を示します。
読み取り操作の結果にアクセスする
読み取り操作によって取得されたデータにアクセスするには、操作結果を保存するための Observable[T]
を作成します。次に、 Observable をサブスクライブし、Observer
クラスのコールバックメソッドをオーバーライドして結果を処理します。
この例では、 cuisine
値が "Czech"
であるドキュメントを restaurants
コレクションでクエリします。結果を検索して処理するために、この例では操作に Observable[Document]
を割り当て、次のアクションを実行します。
subscribe()
メソッドを呼び出してObservable
にサブスクライブし、Observer
をパラメータとして渡しますonNext()
メソッドをオーバーライドして、検索された各ドキュメント(Document
インスタンスである)を出力しますエラーを出力するために
onError()
メソッドをオーバーライドしますObservable
からのすべての結果が処理された後にメッセージを出力するようにonComplete()
メソッドをオーバーライドします
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
書込み (write) 操作の結果にアクセスする
書込み操作によって検索されたデータにアクセスするには、操作結果を保存するための Observable[T]
を作成します。次に、 Observable をサブスクライブし、Observer
クラスのコールバックメソッドをオーバーライドして結果を処理します。
この例では、 cuisine
値が "Nepalese"
であるドキュメントを restaurants
コレクションに挿入します。結果を検索して処理するために、この例では操作に Observable[InsertManyResult]
を割り当て、次のアクションを実行します。
subscribe()
メソッドを呼び出してObservable
にサブスクライブし、Observer
をパラメータとして渡しますonNext()
メソッドをオーバーライドして、挿入操作の結果をInsertManyResult
として返しますエラーを出力するために
onError()
メソッドをオーバーライドしますObservable
からのすべての結果が処理された後にメッセージを出力するようにonComplete()
メソッドをオーバーライドします
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Lambda関数を使用した結果の処理
Observer
クラスのコールバック関数を明示的にオーバーライドする代わりに、Lambda 関数を使用して操作結果を簡潔に処理できます。これらの関数を使用すると、=>
矢印表記を使用して、onNext()
、onError()
、onComplete()
の実装をカスタマイズできます。
例
この例では、 restaurants
コレクションを borough
フィールドの個別の値ごとにクエリします。このコードは distinct()
メソッドによって返された Observable
をサブスクライブし、Lambda 関数を使用して結果を出力し、エラーを処理します。
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
すべての結果を検索するための将来使用
toFuture()
メソッドを呼び出すと、Observable
を暗黙的にサブスクライブし、その結果を集計できます。 Observable
で toFuture()
を呼び出すと、ドライバーは次のアクションを実行します。
サブスクライブ
Observable
Observable
が発行したアイテムを収集し、Future
インスタンスに保存します
次に、Future
を反復処理して操作結果を検索できます。
重要
Observable
に多数のドキュメントが含まれている場合、toFuture()
メソッドを呼び出すと大量のメモリが消費されます。大規模な結果セットが予想される場合は、コールバックまたは Lambda 関数を使用して結果にアクセスすることを検討してください。
例
この例では、 name
フィールドの値が "The Halal Guys"
であるドキュメントを restaurants
コレクションでクエリします。操作結果にアクセスするために、コードは Observable
を Future
に変換し、Future
が各結果を収集するまで待機して、結果を反復処理します。
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。