Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Java
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
Javachevron-right

Java - MongoDB Multi-Document ACID Transactions

Maxime Beugnet10 min read • Published Aug 15, 2018 • Updated Mar 01, 2024
MongoDBJava
SNIPPET
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty

Introduction

Introduced in June 2018 with MongoDB 4.0, multi-document ACID transactions are now supported.
But wait... Does that mean MongoDB did not support transactions before that? No, MongoDB has consistently supported transactions, initially in the form of single-document transactions.
MongoDB 4.0 extends these transactional guarantees across multiple documents, multiple statements, multiple collections, and multiple databases. What good would a database be without any form of transactional data integrity guarantee?
Before delving into the details, you can access the code and experiment with multi-document ACID transactions.
1git clone git@github.com:mongodb-developer/java-quick-start.git

Quick start

Last update: February 28th, 2024

  • Update to Java 21
  • Update Java Driver to 5.0.0
  • Update logback-classic to 1.2.13

Requirements

  • Java 21
  • Maven 3.8.7
  • Docker (optional)

Step 1: start MongoDB

Get started with MongoDB Atlas and get a free cluster.
Or you can start an ephemeral single node replica set using Docker for testing quickly:
1docker run --rm -d -p 27017:27017 -h $(hostname) --name mongo mongo:7.0.5 --replSet=RS && sleep 3 && docker exec mongo mongosh --quiet --eval "rs.initiate();"

Step 2: start Java

This demo contains two main programs: ChangeStreams.java and Transactions.java.
  • The ChangeSteams class enables you to receive notifications of any data changes within the two collections used in this tutorial.
  • The Transactions class is the demo itself.
You need two shells to run them.
First shell:
1mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.transactions.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
Second shell:
1mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.transactions.Transactions" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
Note: Always execute the ChangeStreams program first because it creates the product collection with the required JSON Schema.
Let’s compare our existing single-document transactions with MongoDB 4.0’s ACID-compliant multi-document transactions and see how we can leverage this new feature with Java.

Prior to MongoDB 4.0

Even in MongoDB 3.6 and earlier, every write operation is represented as a transaction scoped to the level of an individual document in the storage layer. Because the document model brings together related data that would otherwise be modeled across separate parent-child tables in a tabular schema, MongoDB’s atomic single-document operations provide transaction semantics that meet the data integrity needs of the majority of applications.
Every typical write operation modifying multiple documents actually happens in several independent transactions: one for each document.
Let’s take an example with a very simple stock management application.
First of all, I need a MongoDB replica set, so please follow the instructions given above to start MongoDB.
Now, let’s insert the following documents into a product collection:
1db.product.insertMany([
2 { "_id" : "beer", "price" : NumberDecimal("3.75"), "stock" : NumberInt(5) },
3 { "_id" : "wine", "price" : NumberDecimal("7.5"), "stock" : NumberInt(3) }
4])
Let’s imagine there is a sale on, and we want to offer our customers a 20% discount on all our products.
But before applying this discount, we want to monitor when these operations are happening in MongoDB with Change Streams.
Execute the following in a MongoDB shell:
1cursor = db.product.watch([{$match: {operationType: "update"}}]);
2while (!cursor.isClosed()) {
3 let next = cursor.tryNext()
4 while (next !== null) {
5 printjson(next);
6 next = cursor.tryNext()
7 }
8}
Keep this shell on the side, open another MongoDB shell, and apply the discount:
1RS [direct: primary] test> db.product.updateMany({}, {$mul: {price:0.8}})
2{
3 acknowledged: true,
4 insertedId: null,
5 matchedCount: 2,
6 modifiedCount: 2,
7 upsertedCount: 0
8}
9RS [direct: primary] test> db.product.find().pretty()
10[
11 { _id: 'beer', price: Decimal128("3.00000000000000000"), stock: 5 },
12 { _id: 'wine', price: Decimal128("6.0000000000000000"), stock: 3 }
13]
As you can see, both documents were updated with a single command line but not in a single transaction. Here is what we can see in the change stream shell:
1{
2 _id: {
3 _data: '8265580539000000012B042C0100296E5A1004A7F55A5B35BD4C7DB2CD56C6CFEA9C49463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C6265657200000004'
4 },
5 operationType: 'update',
6 clusterTime: Timestamp({ t: 1700267321, i: 1 }),
7 wallTime: ISODate("2023-11-18T00:28:41.601Z"),
8 ns: {
9 db: 'test',
10 coll: 'product'
11 },
12 documentKey: {
13 _id: 'beer'
14 },
15 updateDescription: {
16 updatedFields: {
17 price: Decimal128("3.00000000000000000")
18 },
19 removedFields: [],
20 truncatedArrays: []
21 }
22}
23{
24 _id: {
25 _data: '8265580539000000022B042C0100296E5A1004A7F55A5B35BD4C7DB2CD56C6CFEA9C49463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C77696E6500000004'
26 },
27 operationType: 'update',
28 clusterTime: Timestamp({ t: 1700267321, i: 2 }),
29 wallTime: ISODate("2023-11-18T00:28:41.601Z"),
30 ns: {
31 db: 'test',
32 coll: 'product'
33 },
34 documentKey: {
35 _id: 'wine'
36 },
37 updateDescription: {
38 updatedFields: {
39 price: Decimal128("6.0000000000000000")
40 },
41 removedFields: [],
42 truncatedArrays: []
43 }
44}
As you can see, the cluster times (see the clusterTime key) of the two operations are different: The operations occurred during the same second but the counter of the timestamp has been incremented by one.
Thus, here each document is updated one at a time, and even if this happens really fast, someone else could read the documents while the update is running and see only one of the two products with the discount.
Most of the time, this is something you can tolerate in your MongoDB database because, as much as possible, we try to embed tightly linked (or related) data in the same document.
Consequently, two updates on the same document occur within a single transaction:
1RS [direct: primary] test> db.product.updateOne({_id: "wine"},{$inc: {stock:1}, $set: {description : "It's the best wine on Earth"}})
2{
3 acknowledged: true,
4 insertedId: null,
5 matchedCount: 1,
6 modifiedCount: 1,
7 upsertedCount: 0
8}
9RS [direct: primary] test> db.product.findOne({_id: "wine"})
10{
11 _id: 'wine',
12 price: Decimal128("6.0000000000000000"),
13 stock: 4,
14 description: 'It's the best wine on Earth'
15}
However, sometimes, you cannot model all of your related data in a single document, and there are a lot of valid reasons for choosing not to embed documents.

MongoDB 4.0 with multi-document ACID transactions

Multi-document ACID transactions in MongoDB closely resemble what you may already be familiar with in traditional relational databases.
MongoDB’s transactions are a conversational set of related operations that must atomically commit or fully roll back with all-or-nothing execution.
Transactions are used to make sure operations are atomic even across multiple collections or databases. Consequently, with snapshot isolation reads, another user can only observe either all the operations or none of them.
Let’s now add a shopping cart to our example.
For this example, two collections are required because we are dealing with two different business entities: the stock management and the shopping cart each client can create during shopping. The lifecycles of each document in these collections are different.
A document in the product collection represents an item I’m selling. This contains the current price of the product and the current stock. I created a POJO to represent it: Product.java.
1{ "_id" : "beer", "price" : NumberDecimal("3"), "stock" : NumberInt(5) }
A shopping cart is created when a client adds their first item in the cart and is removed when the client proceeds to check out or leaves the website. I created a POJO to represent it: Cart.java.
1{
2 "_id" : "Alice",
3 "items" : [
4 {
5 "price" : NumberDecimal("3"),
6 "productId" : "beer",
7 "quantity" : NumberInt(2)
8 }
9 ]
10}
The challenge here resides in the fact that I cannot sell more than I possess: If I have five beers to sell, I cannot have more than five beers distributed across the different client carts.
To ensure that, I have to make sure that the operation creating or updating the client cart is atomic with the stock update. That’s where the multi-document transaction comes into play. The transaction must fail in case someone tries to buy something I do not have in my stock. I will add a constraint on the product stock:
1db.product.drop()
2db.createCollection("product", {
3 validator: {
4 $jsonSchema: {
5 bsonType: "object",
6 required: [ "_id", "price", "stock" ],
7 properties: {
8 _id: {
9 bsonType: "string",
10 description: "must be a string and is required"
11 },
12 price: {
13 bsonType: "decimal",
14 minimum: 0,
15 description: "must be a non-negative decimal and is required"
16 },
17 stock: {
18 bsonType: "int",
19 minimum: 0,
20 description: "must be a non-negative integer and is required"
21 }
22 }
23 }
24 }
25})
Note that this is already included in the Java code of the ChangeStreams class.
To monitor our example, we are going to use MongoDB Change Streams that were introduced in MongoDB 3.6.
In ChangeStreams.java, I am going to monitor the database test which contains our two collections. It'll print each operation with its associated cluster time.
1package com.mongodb.quickstart.transactions;
2
3import com.mongodb.ConnectionString;
4import com.mongodb.MongoClientSettings;
5import com.mongodb.client.MongoClient;
6import com.mongodb.client.MongoClients;
7import com.mongodb.client.MongoDatabase;
8import com.mongodb.client.model.CreateCollectionOptions;
9import com.mongodb.client.model.ValidationAction;
10import com.mongodb.client.model.ValidationOptions;
11import org.bson.BsonDocument;
12
13import static com.mongodb.client.model.changestream.FullDocument.UPDATE_LOOKUP;
14
15public class ChangeStreams {
16
17 private static final String CART = "cart";
18 private static final String PRODUCT = "product";
19
20 public static void main(String[] args) {
21 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri"));
22 MongoClientSettings clientSettings = MongoClientSettings.builder()
23 .applyConnectionString(connectionString)
24 .build();
25 try (MongoClient client = MongoClients.create(clientSettings)) {
26 MongoDatabase db = client.getDatabase("test");
27 System.out.println("Dropping the '" + db.getName() + "' database.");
28 db.drop();
29 System.out.println("Creating the '" + CART + "' collection.");
30 db.createCollection(CART);
31 System.out.println("Creating the '" + PRODUCT + "' collection with a JSON Schema.");
32 db.createCollection(PRODUCT, productJsonSchemaValidator());
33 System.out.println("Watching the collections in the DB " + db.getName() + "...");
34 db.watch()
35 .fullDocument(UPDATE_LOOKUP)
36 .forEach(doc -> System.out.println(doc.getClusterTime() + " => " + doc.getFullDocument()));
37 }
38 }
39
40 private static CreateCollectionOptions productJsonSchemaValidator() {
41 String jsonSchema = """
42 {
43 "$jsonSchema": {
44 "bsonType": "object",
45 "required": ["_id", "price", "stock"],
46 "properties": {
47 "_id": {
48 "bsonType": "string",
49 "description": "must be a string and is required"
50 },
51 "price": {
52 "bsonType": "decimal",
53 "minimum": 0,
54 "description": "must be a non-negative decimal and is required"
55 },
56 "stock": {
57 "bsonType": "int",
58 "minimum": 0,
59 "description": "must be a non-negative integer and is required"
60 }
61 }
62 }
63 }""";
64 return new CreateCollectionOptions().validationOptions(
65 new ValidationOptions().validationAction(ValidationAction.ERROR)
66 .validator(BsonDocument.parse(jsonSchema)));
67 }
68}
In this example, we have five beers to sell.
Alice wants to buy two beers, but we are not going to use a multi-document transaction for this. We will observe in the change streams two operations at two different cluster times:
  • One creating the cart
  • One updating the stock
Then, Alice adds two more beers to her cart, and we are going to use a transaction this time. The result in the change stream will be two operations happening at the same cluster time.
Finally, she will try to order two extra beers but the jsonSchema validator will fail the product update (as there is only one in stock) and result in a rollback. We will not see anything in the change stream. Below is the source code for Transaction.java:
1package com.mongodb.quickstart.transactions;
2
3import com.mongodb.*;
4import com.mongodb.client.*;
5import com.mongodb.quickstart.transactions.models.Cart;
6import com.mongodb.quickstart.transactions.models.Product;
7import org.bson.BsonDocument;
8import org.bson.codecs.configuration.CodecRegistry;
9import org.bson.codecs.pojo.PojoCodecProvider;
10import org.bson.conversions.Bson;
11
12import java.math.BigDecimal;
13import java.util.ArrayList;
14import java.util.Collections;
15import java.util.List;
16
17import static com.mongodb.client.model.Filters.*;
18import static com.mongodb.client.model.Updates.inc;
19import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
20import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
21
22public class Transactions {
23
24 private static final BigDecimal BEER_PRICE = BigDecimal.valueOf(3);
25 private static final String BEER_ID = "beer";
26 private static final Bson filterId = eq("_id", BEER_ID);
27 private static final Bson filterAlice = eq("_id", "Alice");
28 private static final Bson matchBeer = elemMatch("items", eq("productId", "beer"));
29 private static final Bson incrementTwoBeers = inc("items.$.quantity", 2);
30 private static final Bson decrementTwoBeers = inc("stock", -2);
31 private static MongoCollection<Cart> cartCollection;
32 private static MongoCollection<Product> productCollection;
33
34 public static void main(String[] args) {
35 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri"));
36 CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build());
37 CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry);
38 MongoClientSettings clientSettings = MongoClientSettings.builder()
39 .applyConnectionString(connectionString)
40 .codecRegistry(codecRegistry)
41 .build();
42 try (MongoClient client = MongoClients.create(clientSettings)) {
43 MongoDatabase db = client.getDatabase("test");
44 cartCollection = db.getCollection("cart", Cart.class);
45 productCollection = db.getCollection("product", Product.class);
46 transactionsDemo(client);
47 }
48 }
49
50 private static void transactionsDemo(MongoClient client) {
51 clearCollections();
52 insertProductBeer();
53 printDatabaseState();
54 System.out.println("""
55 ######### NO TRANSACTION #########
56 Alice wants 2 beers.
57 We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.
58 The 2 actions are correlated but can not be executed at the same cluster time.
59 Any error blocking one operation could result in stock error or a sale of beer that we can't fulfill as we have no stock.
60 ------------------------------------""");
61 aliceWantsTwoBeers();
62 sleep();
63 removingBeersFromStock();
64 System.out.println("####################################\n");
65 printDatabaseState();
66 sleep();
67 System.out.println("""
68 ######### WITH TRANSACTION #########
69 Alice wants 2 extra beers.
70 Now we can update the 2 collections simultaneously.
71 The 2 operations only happen when the transaction is committed.
72 ------------------------------------""");
73 aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(client);
74 sleep();
75 System.out.println("""
76 ######### WITH TRANSACTION #########
77 Alice wants 2 extra beers.
78 This time we do not have enough beers in stock so the transaction will rollback.
79 ------------------------------------""");
80 aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(client);
81 }
82
83 private static void aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(MongoClient client) {
84 ClientSession session = client.startSession();
85 try {
86 session.startTransaction(TransactionOptions.builder().writeConcern(WriteConcern.MAJORITY).build());
87 aliceWantsTwoExtraBeers(session);
88 sleep();
89 removingBeerFromStock(session);
90 session.commitTransaction();
91 } catch (MongoException e) {
92 session.abortTransaction();
93 System.out.println("####### ROLLBACK TRANSACTION #######");
94 } finally {
95 session.close();
96 System.out.println("####################################\n");
97 printDatabaseState();
98 }
99 }
100
101 private static void removingBeersFromStock() {
102 System.out.println("Trying to update beer stock : -2 beers.");
103 try {
104 productCollection.updateOne(filterId, decrementTwoBeers);
105 } catch (MongoException e) {
106 System.out.println("######## MongoException ########");
107 System.out.println("##### STOCK CANNOT BE NEGATIVE #####");
108 throw e;
109 }
110 }
111
112 private static void removingBeerFromStock(ClientSession session) {
113 System.out.println("Trying to update beer stock : -2 beers.");
114 try {
115 productCollection.updateOne(session, filterId, decrementTwoBeers);
116 } catch (MongoException e) {
117 System.out.println("######## MongoException ########");
118 System.out.println("##### STOCK CANNOT BE NEGATIVE #####");
119 throw e;
120 }
121 }
122
123 private static void aliceWantsTwoBeers() {
124 System.out.println("Alice adds 2 beers in her cart.");
125 cartCollection.insertOne(new Cart("Alice", List.of(new Cart.Item(BEER_ID, 2, BEER_PRICE))));
126 }
127
128 private static void aliceWantsTwoExtraBeers(ClientSession session) {
129 System.out.println("Updating Alice cart : adding 2 beers.");
130 cartCollection.updateOne(session, and(filterAlice, matchBeer), incrementTwoBeers);
131 }
132
133 private static void insertProductBeer() {
134 productCollection.insertOne(new Product(BEER_ID, 5, BEER_PRICE));
135 }
136
137 private static void clearCollections() {
138 productCollection.deleteMany(new BsonDocument());
139 cartCollection.deleteMany(new BsonDocument());
140 }
141
142 private static void printDatabaseState() {
143 System.out.println("Database state:");
144 printProducts(productCollection.find().into(new ArrayList<>()));
145 printCarts(cartCollection.find().into(new ArrayList<>()));
146 System.out.println();
147 }
148
149 private static void printProducts(List<Product> products) {
150 products.forEach(System.out::println);
151 }
152
153 private static void printCarts(List<Cart> carts) {
154 if (carts.isEmpty()) {
155 System.out.println("No carts...");
156 } else {
157 carts.forEach(System.out::println);
158 }
159 }
160
161 private static void sleep() {
162 System.out.println("Sleeping 1 second...");
163 try {
164 Thread.sleep(1000);
165 } catch (InterruptedException e) {
166 System.err.println("Oops!");
167 e.printStackTrace();
168 }
169 }
170}
Here is the console of the change stream:
1Dropping the 'test' database.
2Creating the 'cart' collection.
3Creating the 'product' collection with a JSON Schema.
4Watching the collections in the DB test...
5Timestamp{value=7304460075832180737, seconds=1700702141, inc=1} => Document{{_id=beer, price=3, stock=5}}
6Timestamp{value=7304460075832180738, seconds=1700702141, inc=2} => Document{{_id=Alice, items=[Document{{price=3, productId=beer, quantity=2}}]}}
7Timestamp{value=7304460080127148033, seconds=1700702142, inc=1} => Document{{_id=beer, price=3, stock=3}}
8Timestamp{value=7304460088717082625, seconds=1700702144, inc=1} => Document{{_id=Alice, items=[Document{{price=3, productId=beer, quantity=4}}]}}
9Timestamp{value=7304460088717082625, seconds=1700702144, inc=1} => Document{{_id=beer, price=3, stock=1}}
As you can see here, we only get five operations because the two last operations were never committed to the database, and therefore, the change stream has nothing to show.
  • The first operation is the product collection initialization (create the product document for the beers).
  • The second and third operations are the first two beers Alice adds to her cart without a multi-doc transaction. Notice that the two operations do not happen at the same cluster time.
  • The two last operations are the two additional beers Alice adds to her cart with a multi-doc transaction. Notice that this time the two operations are atomic, and they are happening exactly at the same cluster time.
Here is the console of the transaction Java process that sums up everything I said earlier.
1Database state:
2Product{id='beer', stock=5, price=3}
3No carts...
4
5######### NO TRANSACTION #########
6Alice wants 2 beers.
7We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.
8The 2 actions are correlated but can not be executed on the same cluster time.
9Any error blocking one operation could result in stock error or a sale of beer that we can't fulfill as we have no stock.
10------------------------------------
11Alice adds 2 beers in her cart.
12Sleeping 1 second...
13Trying to update beer stock : -2 beers.
14####################################
15
16Database state:
17Product{id='beer', stock=3, price=3}
18Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]}
19
20Sleeping 1 second...
21######### WITH TRANSACTION #########
22Alice wants 2 extra beers.
23Now we can update the 2 collections simultaneously.
24The 2 operations only happen when the transaction is committed.
25------------------------------------
26Updating Alice cart : adding 2 beers.
27Sleeping 1 second...
28Trying to update beer stock : -2 beers.
29####################################
30
31Database state:
32Product{id='beer', stock=1, price=3}
33Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}
34
35Sleeping 1 second...
36######### WITH TRANSACTION #########
37Alice wants 2 extra beers.
38This time we do not have enough beers in stock so the transaction will rollback.
39------------------------------------
40Updating Alice cart : adding 2 beers.
41Sleeping 1 second...
42Trying to update beer stock : -2 beers.
43######## MongoException ########
44##### STOCK CANNOT BE NEGATIVE #####
45####### ROLLBACK TRANSACTION #######
46####################################
47
48Database state:
49Product{id='beer', stock=1, price=3}
50Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

Next steps

Thanks for taking the time to read my post. I hope you found it useful and interesting. As a reminder, all the code is available on the GitHub repository for you to experiment.
If you're seeking an easy way to begin with MongoDB, you can achieve that in just five clicks using our MongoDB Atlas cloud database service.

Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Integrating MongoDB With Amazon Managed Streaming for Apache Kafka (MSK)


Sep 17, 2024 | 7 min read
Tutorial

How to Migrate PostgreSQL to MongoDB With Confluent Kafka


Aug 30, 2024 | 10 min read
Quickstart

Java - Mapping POJOs


Mar 01, 2024 | 5 min read
Tutorial

Java Meets Queryable Encryption: Developing a Secure Bank Account Application


Oct 08, 2024 | 14 min read
Table of Contents