Docs 菜单
Docs 主页
/ / /
Java Reactive Streams 驱动程序

从MongoDB读取数据

在此页面上

  • Overview
  • 项目 Reactor 实施
  • 示例应用程序
  • 找到一个
  • 查找多个
  • 对集合中的文档进行计数
  • 对查询返回的文档进行计数
  • 估计文档计数
  • Retrieve Distinct Values
  • 监控数据变化

本页包含Java Reactive Streams驾驶员方法的可复制代码示例,您可以使用这些方法从MongoDB读取数据。

提示

要了解有关此页面上显示的任何方法的更多信息,请参阅每个部分中提供的链接。

要使用本页中的示例,请将代码示例复制到示例应用程序或您自己的应用程序中。 请务必将代码示例中的所有占位符(例如 <connection string> )替换为 MongoDB 部署的相关值。

本指南使用 Project ReactorPublisher 库来使用Java Reactive Streams驾驶员方法返回的 实例。要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 Reactor 文档中的“入门”

还有其他方法可以使用Publisher实例。 您可以使用许多替代库之一,例如 RxJava 或直接调用Publisher.subscribe() 并传递您自己的Subscriber 实施。

本指南中的示例使用 Reactor 中的Flux.blockLast()方法订阅Publisher并区块当前线程,直到Publisher达到其终止状态。 要学习;了解有关 Reactive Streams 计划的更多信息,请参阅 Reactive Streams。

重要

返回的发布者处于冷状态

Java Reactive Streams驾驶员方法返回的所有Publisher实例都是冷实例,这意味着除非您订阅返回的Publisher ,否则不会发生相应的操作。 我们建议仅订阅返回的Publisher一次,因为订阅多次可能会导致错误。

您可以使用以下示例应用程序来测试本页上的代码示例。 要使用示例应用程序,请执行以下步骤:

  1. 在 IDE 中创建一个新的Java项目。

  2. 在Java项目中安装Java Reactive Streams驾驶员。

  3. 安装 Project Reactor 库 在您的Java项目中。

  4. 复制以下代码并将其粘贴到名为ReadOperations.java的新Java文件中。

  5. 从此页面复制代码示例,并将其粘贴到文件中的指定行。

1import com.mongodb.MongoException;
2import com.mongodb.ConnectionString;
3import com.mongodb.MongoClientSettings;
4import com.mongodb.ServerApi;
5import com.mongodb.ServerApiVersion;
6
7import com.mongodb.reactivestreams.client.MongoCollection;
8
9import org.bson.Document;
10
11import com.mongodb.reactivestreams.client.MongoClient;
12import com.mongodb.reactivestreams.client.MongoClients;
13import com.mongodb.reactivestreams.client.MongoDatabase;
14import com.mongodb.reactivestreams.client.FindPublisher;
15import com.mongodb.reactivestreams.client.DistinctPublisher;
16import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
17import reactor.core.publisher.Flux;
18
19import java.util.ArrayList;
20import java.util.Arrays;
21import java.util.List;
22
23import static com.mongodb.client.model.Filters.eq;
24
25class ReadOperations {
26 public static void main(String[] args) throws InterruptedException {
27 // Replace the placeholder with your Atlas connection string
28 String uri = "<connection string>";
29
30 // Construct a ServerApi instance using the ServerApi.builder() method
31 ServerApi serverApi = ServerApi.builder()
32 .version(ServerApiVersion.V1)
33 .build();
34
35 MongoClientSettings settings = MongoClientSettings.builder()
36 .applyConnectionString(new ConnectionString(uri))
37 .serverApi(serverApi)
38 .build();
39
40 // Create a new client and connect to the server
41 try (MongoClient mongoClient = MongoClients.create(settings)) {
42 MongoDatabase database = mongoClient.getDatabase("<database name>");
43 MongoCollection<Document> collection = database.getCollection("<collection name>");
44
45 // Start example code here
46
47 // End example code here
48 }
49 }
50}

以下示例检索与给定过滤指定的条件相匹配的文档:

FindPublisher<Document> findDocPublisher = collection
.find(eq("<field name>", "<value>")).first();
Flux.from(findDocPublisher)
.doOnNext(System.out::println)
.blockLast();

要学习;了解有关find().first()构造的更多信息,请参阅检索数据指南。

以下示例检索与给定过滤指定的条件匹配的所有文档:

FindPublisher<Document> findDocPublisher = collection
.find(eq("<field name>", "<value>"));
Flux.from(findDocPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解有关find()方法的更多信息,请参阅检索数据指南。

以下示例返回指定集合中的文档数:

Publisher<Long> countPublisher = collection.countDocuments();
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

要学习;了解有关countDocuments()方法的更多信息,请参阅文档计数指南。

以下示例返回指定集合中与给定过滤指定的条件相匹配的文档数:

Publisher<Long> countPublisher = collection.countDocuments(
eq("<field name>", "<value>"));
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

要学习;了解有关countDocuments()方法的更多信息,请参阅文档计数指南。

以下示例根据集合元数据返回指定集合中文档的大致数量:

Publisher<Long> countPublisher = collection.estimatedDocumentCount();
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

要学习;了解有关estimatedDocumentCount()方法的更多信息,请参阅文档计数指南。

以下示例返回给定集合中指定字段名称的所有非重复值:

DistinctPublisher<String> distinctPublisher = collection.distinct(
"<field name>", <type>.class);
Flux.from(distinctPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解有关distinct()方法的更多信息,请参阅“检索不同字段值”指南。

以下示例为给定集合创建变更流,并打印该集合中的后续变更事件:

ChangeStreamPublisher<Document> changePublisher = collection.watch();
Flux.from(changePublisher)
.doOnNext(System.out::println)
.blockLast();

要学习;了解有关watch()方法的更多信息,请参阅《监控数据更改》指南。

后退

运行数据库命令