For a project, We are trying to improve the existing performance by upgrading to mongo reactive driver from mongo sync drivers. When we upgraded to reactive drivers, the performance improvement for negligible. To verify any performance gap, I had written a code to push 30K documents ( document size ~256KB), the performance difference between the two driver’s were almost same. What can be done to improve the performance while upgrading to reactive drivers ?
Reactive Code :
Driver version :
org.mongodb
mongodb-driver-reactivestreams
5.0.1
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class MongoDBExample2 {
private static final Logger logger = LoggerFactory.getLogger(MongoDBExample2.class);
private static String str;
static {
try {
str = new String(Files.readAllBytes(Paths.get("<path-to-file-with-content>")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MongoClient mongoClient = MongoClients.create("<connection-string>");
// Get a database and collection
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection<Document> collection = database.getCollection("mock1");
List<WriteModel<Document>> documents = getMockData();
CompletableFuture<BulkWriteResult> futureResult = bulkWriteDocuments(collection, documents);
futureResult.thenAccept(bulkWriteResult -> {
logger.debug("Bulk write completed: {}", bulkWriteResult);
}).exceptionally(throwable -> {
logger.error("Error occurred during bulk write: ", throwable);
return null;
});
futureResult.get();
mongoClient.close();
}
private static List<WriteModel<Document>> getMockData() {
List<WriteModel<Document>> docs = new ArrayList<>();
List<Document> documents = generateLargeDocument(30_000);
for (Document doc : documents) {
docs.add(new InsertOneModel<>(doc));
}
return docs;
}
public static CompletableFuture<BulkWriteResult> bulkWriteDocuments(MongoCollection<Document> collection, List<WriteModel<Document>> documents) {
CompletableFuture<BulkWriteResult> future = new CompletableFuture<>();
Instant start = Instant.now();
collection.bulkWrite(documents).subscribe(new Subscriber<BulkWriteResult>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}
@Override
public void onNext(BulkWriteResult bulkWriteResult) {
System.out.println(" Time taken : " + Duration.between(start, Instant.now()).getSeconds());
future.complete(bulkWriteResult);
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
if (!future.isDone()) {
future.completeExceptionally(new RuntimeException("Bulk write operation completed without emitting a result"));
}
}
});
return future;
}
private static List<Document> generateLargeDocument(int numDocuments) {
List<Document> documents = new ArrayList<>(numDocuments);
Instant op = Instant.now();
for (int i = 0; i < numDocuments; i++) {
Document document = new Document();
document.append("Idd", UUID.randomUUID().toString())
.append("content", str);
documents.add(document);
}
System.out.println("Time taken to generate docs : " + Duration.between(op, Instant.now()).getSeconds());
return documents;
}
}
Sync Code:
Driver version:
org.mongodb
mongodb-driver
3.10.0
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class MongoDBExample {
private static final Logger logger = LoggerFactory.getLogger(MongoDBExample.class);
private static String str;
static {
try {
str = new String(Files.readAllBytes(Paths.get("<path-to-file-with-content>")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("<connection-string>");
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection<Document> collection = database.getCollection("mock1");
List<WriteModel<Document>> documents = getMockData();
Instant start = Instant.now();
collection.bulkWrite(documents);
System.out.println("Time taken to generate docs : " + Duration.between(start, Instant.now()).getSeconds());
mongoClient.close();
}
private static List<WriteModel<Document>> getMockData() {
List<WriteModel<Document>> docs = new ArrayList<>();
List<Document> documents = generateLargeDocument(30_000);
for (Document doc : documents) {
docs.add(new InsertOneModel<>(doc));
}
return docs;
}
private static List<Document> generateLargeDocument(int numDocuments) {
List<Document> documents = new ArrayList<>(numDocuments);
Instant op = Instant.now();
for (int i = 0; i < numDocuments; i++) {
Document document = new Document();
document.append("Idd", UUID.randomUUID().toString())
.append("content", str);
documents.add(document);
}
System.out.println("Time taken to generate docs : " + Duration.between(op, Instant.now()).getSeconds());
return documents;
}
}
Time Taken by Reactive drivers to push 30K records : 2229 seconds
Time Taken by Sync drivers to push 30K records : 2259 seconds