Docs Menu
Docs Home
/ / /
Java Reactive Streams Driver

Read Data From MongoDB

On this page

  • Overview
  • Project Reactor Implementation
  • Sample Application
  • Find One
  • Find Multiple
  • Count Documents in a Collection
  • Count Documents Returned from a Query
  • Estimated Document Count
  • Retrieve Distinct Values
  • Monitor Data Changes

This page contains copyable code examples of Java Reactive Streams driver methods that you can use to read data from MongoDB.

Tip

To learn more about any of the methods shown on this page, see the link provided in each section.

To use an example from this page, copy the code example into the sample application or your own application. Be sure to replace all placeholders in the code examples, such as <connection string>, with the relevant values for your MongoDB deployment.

This guide uses the Project Reactor library to consume Publisher instances returned by the Java Reactive Streams driver methods. To learn more about the Project Reactor library and how to use it, see Getting Started in the Reactor documentation.

There are also other ways to consume Publisher instances. You can use one of many alternative libraries such as RxJava or call Publisher.subscribe() directly and pass your own implementation of a Subscriber.

The examples in this guide use the Flux.blockLast() method from Reactor to subscribe to a Publisher and block the current thread until the Publisher reaches its terminal state. To learn more about the Reactive Streams initiative, see Reactive Streams.

Important

Publishers Returned are Cold

All Publisher instances returned by the Java Reactive Streams driver methods are cold, which means that the corresponding operation does not happen unless you subscribe to the returned Publisher. We recommend only subscribing to the returned Publisher once, because subscribing more than once can lead to errors.

You can use the following sample application to test the code examples on this page. To use the sample application, perform the following steps:

  1. Create a new Java project in your IDE.

  2. Install the Java Reactive Streams driver in your Java project.

  3. Install the Project Reactor library in your Java project.

  4. Copy the following code and paste it into a new Java file named ReadOperations.java.

  5. Copy a code example from this page and paste it on the specified lines in the file.

1import com.mongodb.MongoException;
2import com.mongodb.ConnectionString;
3import com.mongodb.MongoClientSettings;
4import com.mongodb.ServerApi;
5import com.mongodb.ServerApiVersion;
6
7import com.mongodb.reactivestreams.client.MongoCollection;
8
9import org.bson.Document;
10
11import com.mongodb.reactivestreams.client.MongoClient;
12import com.mongodb.reactivestreams.client.MongoClients;
13import com.mongodb.reactivestreams.client.MongoDatabase;
14import com.mongodb.reactivestreams.client.FindPublisher;
15import com.mongodb.reactivestreams.client.DistinctPublisher;
16import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
17import reactor.core.publisher.Flux;
18
19import java.util.ArrayList;
20import java.util.Arrays;
21import java.util.List;
22
23import static com.mongodb.client.model.Filters.eq;
24
25class ReadOperations {
26 public static void main(String[] args) throws InterruptedException {
27 // Replace the placeholder with your Atlas connection string
28 String uri = "<connection string>";
29
30 // Construct a ServerApi instance using the ServerApi.builder() method
31 ServerApi serverApi = ServerApi.builder()
32 .version(ServerApiVersion.V1)
33 .build();
34
35 MongoClientSettings settings = MongoClientSettings.builder()
36 .applyConnectionString(new ConnectionString(uri))
37 .serverApi(serverApi)
38 .build();
39
40 // Create a new client and connect to the server
41 try (MongoClient mongoClient = MongoClients.create(settings)) {
42 MongoDatabase database = mongoClient.getDatabase("<database name>");
43 MongoCollection<Document> collection = database.getCollection("<collection name>");
44
45 // Start example code here
46
47 // End example code here
48 }
49 }
50}

The following example retrieves a document that matches the criteria specified by the given filter:

FindPublisher<Document> findDocPublisher = collection
.find(eq("<field name>", "<value>")).first();
Flux.from(findDocPublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the find().first() construct, see the Retrieve Data guide.

The following example retrieves all documents that match the criteria specified by the given filter:

FindPublisher<Document> findDocPublisher = collection
.find(eq("<field name>", "<value>"));
Flux.from(findDocPublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the find() method, see the Retrieve Data guide.

The following example returns the number of documents in the specified collection:

Publisher<Long> countPublisher = collection.countDocuments();
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the countDocuments() method, see the Count Documents guide.

The following example returns the number of documents in the specified collection that match the criteria specified by the given filter:

Publisher<Long> countPublisher = collection.countDocuments(
eq("<field name>", "<value>"));
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the countDocuments() method, see the Count Documents guide.

The following example returns an approximate number of documents in the specified collection based on collection metadata:

Publisher<Long> countPublisher = collection.estimatedDocumentCount();
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the estimatedDocumentCount() method, see the Count Documents guide.

The following example returns all distinct values of the specified field name in a given collection:

DistinctPublisher<String> distinctPublisher = collection.distinct(
"<field name>", <type>.class);
Flux.from(distinctPublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the distinct() method, see the Retrieve Distinct Field Values guide.

The following example creates a change stream for a given collection and prints out subsequent change events in that collection:

ChangeStreamPublisher<Document> changePublisher = collection.watch();
Flux.from(changePublisher)
.doOnNext(System.out::println)
.blockLast();

To learn more about the watch() method, see the Monitor Data Changes guide.

Back

Run a Database Command