ANNOUNCEMENT: Voyage AI joins MongoDB to power more accurate and trustworthy AI applications on Atlas.
Learn more
Docs 菜单

从 Observable 访问数据

在本指南中,您可以学习;了解如何从 Observable实例访问权限MongoDB操作的结果。

Observable 表示操作随时间推移发出的数据流。 要访问权限此数据,您可以创建一个订阅 ObservableObserver实例。

注意

Scala驾驶员基于MongoDB Java Reactive Streams驾驶员构建。 Observable 类扩展了Java Reactive Streams 中的 Publisher 类,并包含其他有助于进程结果的便捷方法。

要运行MongoDB操作并进程其数据,您必须从 Observable请求操作结果。驾驶员为返回任意数量结果的操作提供了 Observable 接口。不产生结果或只产生一个结果的操作(例如 findOne() 方法)会返回 SingleObservable[T][T] 参数化对应于 SingleObservable 处理的数据类型。

可以生成无限数量结果的操作会返回 Observable[T] 类型的实例。 某些操作会返回特定的 Observable 类型,这些类型提供了在订阅结果之前过滤和进程结果的其他方法。 以下列表描述了一些允许您将特定于操作的方法链接到 Observable 的类型:

  • FindObservable[T]:由 find() 方法返回

  • DistinctObservable[T]:由 distinct() 方法返回

  • AggregateObservable[T]:由 aggregate() 方法返回

您可以通过对操作的 Observable 调用 subscribe() 方法来请求操作结果。将 Observer 类的实例作为参数传递给 subscribe() 方法。此 ObserverObservable 接收操作结果。然后,您可以使用 Observer 类提供的方法打印结果、处理错误并执行其他数据处理。

要学习;了解有关处理结果的更多信息,请参阅以下API文档:

restaurantssample_restaurants本指南中的示例使用Atlas示例数据集的 数据库中的 集合。要从Scala应用程序访问权限此集合,请创建一个连接到Atlas 集群的MongoClient,并将以下值分配给 databasecollection 变量:

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。

订阅Observable[T] 后,您可以使用 Observer 类提供的以下回调方法来访问权限操作结果或处理错误:

  • onNext(result: TResult):当 Observer 收到新结果时调用。 您可以通过重写此方法来定义处理结果的逻辑。

  • onError(e: Throwable):当操作生成错误并阻止 ObserverObservable 接收更多数据时调用。 您可以通过重写此方法来定义错误处理逻辑。

  • onComplete():当 Observer 消耗完来自 Observable 的所有结果时调用。您可以通过重写此方法来执行任何最终数据处理。

以下部分介绍如何自定义这些方法以进程来自 Observable 的读取和写入操作结果。

要访问权限读取操作检索的数据,请创建 Observable[T] 来存储操作结果。 然后,订阅该可观察对象并重写 Observer 类回调方法以进程结果。

此示例查询 restaurants集合中 cuisine 值为 "Czech" 的文档。为了检索和进程结果,该示例为操作分配了 Observable[Document],并执行以下操作:

  • 调用 subscribe() 方法订阅Observable 并将 Observer 作为参数传递

  • 覆盖 onNext() 方法以打印每个检索到的文档,这些文档是 Document 实例

  • 覆盖 onError() 方法以打印任何错误

  • 覆盖 onComplete() 方法,以在处理 Observable 的所有结果后打印消息

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

要访问权限写入操作检索的数据,请创建 Observable[T] 来存储操作结果。 然后,订阅该可观察对象并重写 Observer 类回调方法以进程结果。

此示例将文档插入到 cuisine 值为 "Nepalese"restaurants集合中。 为了检索和进程结果,该示例为操作分配了一个 Observable[InsertManyResult] 并执行以下操作:

  • 调用 subscribe() 方法订阅Observable 并将 Observer 作为参数传递

  • 覆盖 onNext() 方法以打印插入操作的结果,以 InsertManyResult 形式返回

  • 覆盖 onError() 方法以打印任何错误

  • 覆盖 onComplete() 方法,以在处理 Observable 的所有结果后打印消息

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

您可以使用Lambda函数来简洁地进程操作结果,而无需显式覆盖 Observer 类中的回调函数。 这些函数允许您使用 => 箭头表示法自定义 onNext()onError()onComplete() 的实施。

提示

要学习;了解有关Lambda函数(也称为匿名函数)的更多信息,请参阅匿名函数维基百科条目。

此示例在 restaurants集合中查询 borough字段的每个不同值。 该代码订阅 distinct() 方法返回的 Observable,然后使用Lambda函数打印结果并处理错误:

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

您可以隐式订阅Observable,并通过调用 toFuture() 方法聚合其结果。 当您对 Observable 调用 toFuture() 时,驾驶员会执行以下操作:

  • 订阅 Observable

  • 收集 Observable 发出的项并将其存储在 Future实例中

然后,您可以遍历 Future 以检索操作结果。

重要

如果您的 Observable 包含大量文档,则调用 toFuture() 方法将消耗大量内存。如果您期望得到大型结果设立,请考虑使用回调Lambda函数来访问权限结果。

此示例查询 restaurants集合中 name字段值为 "The Halal Guys" 的文档。 为了访问权限操作结果,代码将 Observable 转换为 Future,等待 Future 收集每个结果,然后遍历这些结果:

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: