从 Observable 访问数据
Overview
在本指南中,您可以学习;了解如何从 Observable
实例访问权限MongoDB操作的结果。
Observable
表示操作随时间推移发出的数据流。 要访问权限此数据,您可以创建一个订阅 Observable
的 Observer
实例。
注意
Scala驾驶员基于MongoDB Java Reactive Streams驾驶员构建。 Observable
类扩展了Java Reactive Streams 中的 Publisher
类,并包含其他有助于进程结果的便捷方法。
如何处理 Observable
要运行MongoDB操作并进程其数据,您必须从 Observable
请求操作结果。驾驶员为返回任意数量结果的操作提供了 Observable
接口。不产生结果或只产生一个结果的操作(例如 findOne()
方法)会返回 SingleObservable[T]
。[T]
参数化对应于 SingleObservable
处理的数据类型。
可以生成无限数量结果的操作会返回 Observable[T]
类型的实例。 某些操作会返回特定的 Observable
类型,这些类型提供了在订阅结果之前过滤和进程结果的其他方法。 以下列表描述了一些允许您将特定于操作的方法链接到 Observable
的类型:
FindObservable[T]
:由find()
方法返回DistinctObservable[T]
:由distinct()
方法返回AggregateObservable[T]
:由aggregate()
方法返回
您可以通过对操作的 Observable
调用 subscribe()
方法来请求操作结果。将 Observer
类的实例作为参数传递给 subscribe()
方法。此 Observer
从 Observable
接收操作结果。然后,您可以使用 Observer
类提供的方法打印结果、处理错误并执行其他数据处理。
要学习;了解有关处理结果的更多信息,请参阅以下API文档:
样本数据
restaurants
sample_restaurants
本指南中的示例使用Atlas示例数据集的 数据库中的 集合。要从Scala应用程序访问权限此集合,请创建一个连接到Atlas 集群的MongoClient
,并将以下值分配给 database
和 collection
变量:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。
使用回调处理结果
订阅Observable[T]
后,您可以使用 Observer
类提供的以下回调方法来访问权限操作结果或处理错误:
onNext(result: TResult)
:当Observer
收到新结果时调用。 您可以通过重写此方法来定义处理结果的逻辑。onError(e: Throwable)
:当操作生成错误并阻止Observer
从Observable
接收更多数据时调用。 您可以通过重写此方法来定义错误处理逻辑。onComplete()
:当Observer
消耗完来自Observable
的所有结果时调用。您可以通过重写此方法来执行任何最终数据处理。
以下部分介绍如何自定义这些方法以进程来自 Observable
的读取和写入操作结果。
访问读取操作结果
要访问权限读取操作检索的数据,请创建 Observable[T]
来存储操作结果。 然后,订阅该可观察对象并重写 Observer
类回调方法以进程结果。
此示例查询 restaurants
集合中 cuisine
值为 "Czech"
的文档。为了检索和进程结果,该示例为操作分配了 Observable[Document]
,并执行以下操作:
调用
subscribe()
方法订阅Observable
并将Observer
作为参数传递覆盖
onNext()
方法以打印每个检索到的文档,这些文档是Document
实例覆盖
onError()
方法以打印任何错误覆盖
onComplete()
方法,以在处理Observable
的所有结果后打印消息
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
访问写入操作结果
要访问权限写入操作检索的数据,请创建 Observable[T]
来存储操作结果。 然后,订阅该可观察对象并重写 Observer
类回调方法以进程结果。
此示例将文档插入到 cuisine
值为 "Nepalese"
的 restaurants
集合中。 为了检索和进程结果,该示例为操作分配了一个 Observable[InsertManyResult]
并执行以下操作:
调用
subscribe()
方法订阅Observable
并将Observer
作为参数传递覆盖
onNext()
方法以打印插入操作的结果,以InsertManyResult
形式返回覆盖
onError()
方法以打印任何错误覆盖
onComplete()
方法,以在处理Observable
的所有结果后打印消息
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
使用Lambda函数处理结果
您可以使用Lambda函数来简洁地进程操作结果,而无需显式覆盖 Observer
类中的回调函数。 这些函数允许您使用 =>
箭头表示法自定义 onNext()
、onError()
和 onComplete()
的实施。
提示
要学习;了解有关Lambda函数(也称为匿名函数)的更多信息,请参阅匿名函数维基百科条目。
例子
此示例在 restaurants
集合中查询 borough
字段的每个不同值。 该代码订阅 distinct()
方法返回的 Observable
,然后使用Lambda函数打印结果并处理错误:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
使用Futures 检索所有结果
您可以隐式订阅Observable
,并通过调用 toFuture()
方法聚合其结果。 当您对 Observable
调用 toFuture()
时,驾驶员会执行以下操作:
订阅
Observable
收集
Observable
发出的项并将其存储在Future
实例中
然后,您可以遍历 Future
以检索操作结果。
例子
此示例查询 restaurants
集合中 name
字段值为 "The Halal Guys"
的文档。 为了访问权限操作结果,代码将 Observable
转换为 Future
,等待 Future
收集每个结果,然后遍历这些结果:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: