Java - MongoDB Multi-Document ACID Transactions
Rate this quickstart
Introduced in June 2018 with MongoDB 4.0, multi-document ACID transactions are now supported.
But wait... Does that mean MongoDB did not support transactions before that?
No, MongoDB has consistently supported transactions, initially in the form of single-document transactions.
MongoDB 4.0 extends these transactional guarantees across multiple documents, multiple statements, multiple collections,
and multiple databases. What good would a database be without any form of transactional data integrity guarantee?
Before delving into the details, you can access the code and experiment with multi-document ACID
transactions.
1 git clone git@github.com:mongodb-developer/java-quick-start.git
- Update to Java 21
- Update Java Driver to 5.0.0
- Update
logback-classic
to 1.2.13
- Java 21
- Maven 3.8.7
- Docker (optional)
Or you can start an ephemeral single node replica set using Docker for testing quickly:
1 docker run --rm -d -p 27017:27017 -h $(hostname) --name mongo mongo:7.0.5 --replSet=RS && sleep 3 && docker exec mongo mongosh --quiet --eval "rs.initiate();"
This demo contains two main programs:
ChangeStreams.java
and Transactions.java
.- The
ChangeSteams
class enables you to receive notifications of any data changes within the two collections used in this tutorial. - The
Transactions
class is the demo itself.
You need two shells to run them.
First shell:
1 mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.transactions.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
Second shell:
1 mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.transactions.Transactions" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
Note: Always execute the
ChangeStreams
program first because it creates the product
collection with the
required JSON Schema.Let’s compare our existing single-document transactions with MongoDB 4.0’s ACID-compliant multi-document transactions
and see how we can leverage this new feature with Java.
Even in MongoDB 3.6 and earlier, every write operation is represented as a transaction scoped to the level of an
individual document in the storage layer. Because the document model brings together related data that would otherwise
be modeled across separate parent-child tables in a tabular schema, MongoDB’s atomic single-document operations provide
transaction semantics that meet the data integrity needs of the majority of applications.
Every typical write operation modifying multiple documents actually happens in several independent transactions: one for
each document.
Let’s take an example with a very simple stock management application.
First of all, I need a MongoDB replica set, so please follow the
instructions given above to start MongoDB.
Now, let’s insert the following documents into a
product
collection:1 db.product.insertMany([ 2 { "_id" : "beer", "price" : NumberDecimal("3.75"), "stock" : NumberInt(5) }, 3 { "_id" : "wine", "price" : NumberDecimal("7.5"), "stock" : NumberInt(3) } 4 ])
Let’s imagine there is a sale on, and we want to offer our customers a 20% discount on all our products.
But before applying this discount, we want to monitor when these operations are happening in MongoDB with Change
Streams.
1 cursor = db.product.watch([{$match: {operationType: "update"}}]); 2 while (!cursor.isClosed()) { 3 let next = cursor.tryNext() 4 while (next !== null) { 5 printjson(next); 6 next = cursor.tryNext() 7 } 8 }
Keep this shell on the side, open another MongoDB shell, and apply the discount:
1 RS [direct: primary] test> db.product.updateMany({}, {$mul: {price:0.8}}) 2 { 3 acknowledged: true, 4 insertedId: null, 5 matchedCount: 2, 6 modifiedCount: 2, 7 upsertedCount: 0 8 } 9 RS [direct: primary] test> db.product.find().pretty() 10 [ 11 { _id: 'beer', price: Decimal128("3.00000000000000000"), stock: 5 }, 12 { _id: 'wine', price: Decimal128("6.0000000000000000"), stock: 3 } 13 ]
As you can see, both documents were updated with a single command line but not in a single transaction.
Here is what we can see in the change stream shell:
1 { 2 _id: { 3 _data: '8265580539000000012B042C0100296E5A1004A7F55A5B35BD4C7DB2CD56C6CFEA9C49463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C6265657200000004' 4 }, 5 operationType: 'update', 6 clusterTime: Timestamp({ t: 1700267321, i: 1 }), 7 wallTime: ISODate("2023-11-18T00:28:41.601Z"), 8 ns: { 9 db: 'test', 10 coll: 'product' 11 }, 12 documentKey: { 13 _id: 'beer' 14 }, 15 updateDescription: { 16 updatedFields: { 17 price: Decimal128("3.00000000000000000") 18 }, 19 removedFields: [], 20 truncatedArrays: [] 21 } 22 } 23 { 24 _id: { 25 _data: '8265580539000000022B042C0100296E5A1004A7F55A5B35BD4C7DB2CD56C6CFEA9C49463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C77696E6500000004' 26 }, 27 operationType: 'update', 28 clusterTime: Timestamp({ t: 1700267321, i: 2 }), 29 wallTime: ISODate("2023-11-18T00:28:41.601Z"), 30 ns: { 31 db: 'test', 32 coll: 'product' 33 }, 34 documentKey: { 35 _id: 'wine' 36 }, 37 updateDescription: { 38 updatedFields: { 39 price: Decimal128("6.0000000000000000") 40 }, 41 removedFields: [], 42 truncatedArrays: [] 43 } 44 }
As you can see, the cluster times (see the
clusterTime
key) of the two operations are different: The operations
occurred during the same second but the counter of the timestamp has been incremented by one.Thus, here each document is updated one at a time, and even if this happens really fast, someone else could read the
documents while the update is running and see only one of the two products with the discount.
Most of the time, this is something you can tolerate in your MongoDB database because, as much as possible, we try to
embed tightly linked (or related) data in the same document.
Consequently, two updates on the same document occur within a single transaction:
1 RS [direct: primary] test> db.product.updateOne({_id: "wine"},{$inc: {stock:1}, $set: {description : "It's the best wine on Earth"}}) 2 { 3 acknowledged: true, 4 insertedId: null, 5 matchedCount: 1, 6 modifiedCount: 1, 7 upsertedCount: 0 8 } 9 RS [direct: primary] test> db.product.findOne({_id: "wine"}) 10 { 11 _id: 'wine', 12 price: Decimal128("6.0000000000000000"), 13 stock: 4, 14 description: 'It's the best wine on Earth' 15 }
However, sometimes, you cannot model all of your related data in a single document, and there are a lot of valid reasons
for choosing not to embed documents.
Multi-document ACID transactions in MongoDB closely resemble what
you may already be familiar with in traditional relational databases.
MongoDB’s transactions are a conversational set of related operations that must atomically commit or fully roll back with
all-or-nothing execution.
Transactions are used to make sure operations are atomic even across multiple collections or databases. Consequently,
with snapshot isolation reads, another user can only observe either all the operations or none of them.
Let’s now add a shopping cart to our example.
For this example, two collections are required because we are dealing with two different business entities: the stock
management and the shopping cart each client can create during shopping. The lifecycles of each document in these
collections are different.
A document in the product collection represents an item I’m selling. This contains the current price of the product and
the current stock. I created a POJO to represent
it: Product.java.
1 { "_id" : "beer", "price" : NumberDecimal("3"), "stock" : NumberInt(5) }
A shopping cart is created when a client adds their first item in the cart and is removed when the client proceeds to
check out or leaves the website. I created a POJO to represent
it: Cart.java.
1 { 2 "_id" : "Alice", 3 "items" : [ 4 { 5 "price" : NumberDecimal("3"), 6 "productId" : "beer", 7 "quantity" : NumberInt(2) 8 } 9 ] 10 }
The challenge here resides in the fact that I cannot sell more than I possess: If I have five beers to sell, I cannot have
more than five beers distributed across the different client carts.
To ensure that, I have to make sure that the operation creating or updating the client cart is atomic with the stock
update. That’s where the multi-document transaction comes into play.
The transaction must fail in case someone tries to buy something I do not have in my stock. I will add a constraint
on the product stock:
1 db.product.drop() 2 db.createCollection("product", { 3 validator: { 4 $jsonSchema: { 5 bsonType: "object", 6 required: [ "_id", "price", "stock" ], 7 properties: { 8 _id: { 9 bsonType: "string", 10 description: "must be a string and is required" 11 }, 12 price: { 13 bsonType: "decimal", 14 minimum: 0, 15 description: "must be a non-negative decimal and is required" 16 }, 17 stock: { 18 bsonType: "int", 19 minimum: 0, 20 description: "must be a non-negative integer and is required" 21 } 22 } 23 } 24 } 25 })
Note that this is already included in the Java code of the
ChangeStreams
class.To monitor our example, we are going to use MongoDB Change Streams
that were introduced in MongoDB 3.6.
In ChangeStreams.java,
I am going to monitor the database
test
which contains our two collections. It'll print each
operation with its associated cluster time.1 package com.mongodb.quickstart.transactions; 2 3 import com.mongodb.ConnectionString; 4 import com.mongodb.MongoClientSettings; 5 import com.mongodb.client.MongoClient; 6 import com.mongodb.client.MongoClients; 7 import com.mongodb.client.MongoDatabase; 8 import com.mongodb.client.model.CreateCollectionOptions; 9 import com.mongodb.client.model.ValidationAction; 10 import com.mongodb.client.model.ValidationOptions; 11 import org.bson.BsonDocument; 12 13 import static com.mongodb.client.model.changestream.FullDocument.UPDATE_LOOKUP; 14 15 public class ChangeStreams { 16 17 private static final String CART = "cart"; 18 private static final String PRODUCT = "product"; 19 20 public static void main(String[] args) { 21 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri")); 22 MongoClientSettings clientSettings = MongoClientSettings.builder() 23 .applyConnectionString(connectionString) 24 .build(); 25 try (MongoClient client = MongoClients.create(clientSettings)) { 26 MongoDatabase db = client.getDatabase("test"); 27 System.out.println("Dropping the '" + db.getName() + "' database."); 28 db.drop(); 29 System.out.println("Creating the '" + CART + "' collection."); 30 db.createCollection(CART); 31 System.out.println("Creating the '" + PRODUCT + "' collection with a JSON Schema."); 32 db.createCollection(PRODUCT, productJsonSchemaValidator()); 33 System.out.println("Watching the collections in the DB " + db.getName() + "..."); 34 db.watch() 35 .fullDocument(UPDATE_LOOKUP) 36 .forEach(doc -> System.out.println(doc.getClusterTime() + " => " + doc.getFullDocument())); 37 } 38 } 39 40 private static CreateCollectionOptions productJsonSchemaValidator() { 41 String jsonSchema = """ 42 { 43 "$jsonSchema": { 44 "bsonType": "object", 45 "required": ["_id", "price", "stock"], 46 "properties": { 47 "_id": { 48 "bsonType": "string", 49 "description": "must be a string and is required" 50 }, 51 "price": { 52 "bsonType": "decimal", 53 "minimum": 0, 54 "description": "must be a non-negative decimal and is required" 55 }, 56 "stock": { 57 "bsonType": "int", 58 "minimum": 0, 59 "description": "must be a non-negative integer and is required" 60 } 61 } 62 } 63 }"""; 64 return new CreateCollectionOptions().validationOptions( 65 new ValidationOptions().validationAction(ValidationAction.ERROR) 66 .validator(BsonDocument.parse(jsonSchema))); 67 } 68 }
In this example, we have five beers to sell.
Alice wants to buy two beers, but we are not going to use a multi-document transaction for this. We will
observe in the change streams two operations at two different cluster times:
- One creating the cart
- One updating the stock
Then, Alice adds two more beers to her cart, and we are going to use a transaction this time. The result in the change
stream will be two operations happening at the same cluster time.
Finally, she will try to order two extra beers but the jsonSchema validator will fail the product update (as there is only
one in stock) and result in a
rollback. We will not see anything in the change stream.
Below is the source code
for Transaction.java:
1 package com.mongodb.quickstart.transactions; 2 3 import com.mongodb.*; 4 import com.mongodb.client.*; 5 import com.mongodb.quickstart.transactions.models.Cart; 6 import com.mongodb.quickstart.transactions.models.Product; 7 import org.bson.BsonDocument; 8 import org.bson.codecs.configuration.CodecRegistry; 9 import org.bson.codecs.pojo.PojoCodecProvider; 10 import org.bson.conversions.Bson; 11 12 import java.math.BigDecimal; 13 import java.util.ArrayList; 14 import java.util.Collections; 15 import java.util.List; 16 17 import static com.mongodb.client.model.Filters.*; 18 import static com.mongodb.client.model.Updates.inc; 19 import static org.bson.codecs.configuration.CodecRegistries.fromProviders; 20 import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; 21 22 public class Transactions { 23 24 private static final BigDecimal BEER_PRICE = BigDecimal.valueOf(3); 25 private static final String BEER_ID = "beer"; 26 private static final Bson filterId = eq("_id", BEER_ID); 27 private static final Bson filterAlice = eq("_id", "Alice"); 28 private static final Bson matchBeer = elemMatch("items", eq("productId", "beer")); 29 private static final Bson incrementTwoBeers = inc("items.$.quantity", 2); 30 private static final Bson decrementTwoBeers = inc("stock", -2); 31 private static MongoCollection<Cart> cartCollection; 32 private static MongoCollection<Product> productCollection; 33 34 public static void main(String[] args) { 35 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri")); 36 CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build()); 37 CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry); 38 MongoClientSettings clientSettings = MongoClientSettings.builder() 39 .applyConnectionString(connectionString) 40 .codecRegistry(codecRegistry) 41 .build(); 42 try (MongoClient client = MongoClients.create(clientSettings)) { 43 MongoDatabase db = client.getDatabase("test"); 44 cartCollection = db.getCollection("cart", Cart.class); 45 productCollection = db.getCollection("product", Product.class); 46 transactionsDemo(client); 47 } 48 } 49 50 private static void transactionsDemo(MongoClient client) { 51 clearCollections(); 52 insertProductBeer(); 53 printDatabaseState(); 54 System.out.println(""" 55 ######### NO TRANSACTION ######### 56 Alice wants 2 beers. 57 We have to create a cart in the 'cart' collection and update the stock in the 'product' collection. 58 The 2 actions are correlated but can not be executed at the same cluster time. 59 Any error blocking one operation could result in stock error or a sale of beer that we can't fulfill as we have no stock. 60 ------------------------------------"""); 61 aliceWantsTwoBeers(); 62 sleep(); 63 removingBeersFromStock(); 64 System.out.println("####################################\n"); 65 printDatabaseState(); 66 sleep(); 67 System.out.println(""" 68 ######### WITH TRANSACTION ######### 69 Alice wants 2 extra beers. 70 Now we can update the 2 collections simultaneously. 71 The 2 operations only happen when the transaction is committed. 72 ------------------------------------"""); 73 aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(client); 74 sleep(); 75 System.out.println(""" 76 ######### WITH TRANSACTION ######### 77 Alice wants 2 extra beers. 78 This time we do not have enough beers in stock so the transaction will rollback. 79 ------------------------------------"""); 80 aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(client); 81 } 82 83 private static void aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(MongoClient client) { 84 ClientSession session = client.startSession(); 85 try { 86 session.startTransaction(TransactionOptions.builder().writeConcern(WriteConcern.MAJORITY).build()); 87 aliceWantsTwoExtraBeers(session); 88 sleep(); 89 removingBeerFromStock(session); 90 session.commitTransaction(); 91 } catch (MongoException e) { 92 session.abortTransaction(); 93 System.out.println("####### ROLLBACK TRANSACTION #######"); 94 } finally { 95 session.close(); 96 System.out.println("####################################\n"); 97 printDatabaseState(); 98 } 99 } 100 101 private static void removingBeersFromStock() { 102 System.out.println("Trying to update beer stock : -2 beers."); 103 try { 104 productCollection.updateOne(filterId, decrementTwoBeers); 105 } catch (MongoException e) { 106 System.out.println("######## MongoException ########"); 107 System.out.println("##### STOCK CANNOT BE NEGATIVE #####"); 108 throw e; 109 } 110 } 111 112 private static void removingBeerFromStock(ClientSession session) { 113 System.out.println("Trying to update beer stock : -2 beers."); 114 try { 115 productCollection.updateOne(session, filterId, decrementTwoBeers); 116 } catch (MongoException e) { 117 System.out.println("######## MongoException ########"); 118 System.out.println("##### STOCK CANNOT BE NEGATIVE #####"); 119 throw e; 120 } 121 } 122 123 private static void aliceWantsTwoBeers() { 124 System.out.println("Alice adds 2 beers in her cart."); 125 cartCollection.insertOne(new Cart("Alice", List.of(new Cart.Item(BEER_ID, 2, BEER_PRICE)))); 126 } 127 128 private static void aliceWantsTwoExtraBeers(ClientSession session) { 129 System.out.println("Updating Alice cart : adding 2 beers."); 130 cartCollection.updateOne(session, and(filterAlice, matchBeer), incrementTwoBeers); 131 } 132 133 private static void insertProductBeer() { 134 productCollection.insertOne(new Product(BEER_ID, 5, BEER_PRICE)); 135 } 136 137 private static void clearCollections() { 138 productCollection.deleteMany(new BsonDocument()); 139 cartCollection.deleteMany(new BsonDocument()); 140 } 141 142 private static void printDatabaseState() { 143 System.out.println("Database state:"); 144 printProducts(productCollection.find().into(new ArrayList<>())); 145 printCarts(cartCollection.find().into(new ArrayList<>())); 146 System.out.println(); 147 } 148 149 private static void printProducts(List<Product> products) { 150 products.forEach(System.out::println); 151 } 152 153 private static void printCarts(List<Cart> carts) { 154 if (carts.isEmpty()) { 155 System.out.println("No carts..."); 156 } else { 157 carts.forEach(System.out::println); 158 } 159 } 160 161 private static void sleep() { 162 System.out.println("Sleeping 1 second..."); 163 try { 164 Thread.sleep(1000); 165 } catch (InterruptedException e) { 166 System.err.println("Oops!"); 167 e.printStackTrace(); 168 } 169 } 170 }
Here is the console of the change stream:
1 Dropping the 'test' database. 2 Creating the 'cart' collection. 3 Creating the 'product' collection with a JSON Schema. 4 Watching the collections in the DB test... 5 Timestamp{value=7304460075832180737, seconds=1700702141, inc=1} => Document{{_id=beer, price=3, stock=5}} 6 Timestamp{value=7304460075832180738, seconds=1700702141, inc=2} => Document{{_id=Alice, items=[Document{{price=3, productId=beer, quantity=2}}]}} 7 Timestamp{value=7304460080127148033, seconds=1700702142, inc=1} => Document{{_id=beer, price=3, stock=3}} 8 Timestamp{value=7304460088717082625, seconds=1700702144, inc=1} => Document{{_id=Alice, items=[Document{{price=3, productId=beer, quantity=4}}]}} 9 Timestamp{value=7304460088717082625, seconds=1700702144, inc=1} => Document{{_id=beer, price=3, stock=1}}
As you can see here, we only get five operations because the two last operations were never committed to the database,
and therefore, the change stream has nothing to show.
- The first operation is the product collection initialization (create the product document for the beers).
- The second and third operations are the first two beers Alice adds to her cart without a multi-doc transaction. Notice that the two operations do not happen at the same cluster time.
- The two last operations are the two additional beers Alice adds to her cart with a multi-doc transaction. Notice that this time the two operations are atomic, and they are happening exactly at the same cluster time.
Here is the console of the transaction Java process that sums up everything I said earlier.
1 Database state: 2 Product{id='beer', stock=5, price=3} 3 No carts... 4 5 ######### NO TRANSACTION ######### 6 Alice wants 2 beers. 7 We have to create a cart in the 'cart' collection and update the stock in the 'product' collection. 8 The 2 actions are correlated but can not be executed on the same cluster time. 9 Any error blocking one operation could result in stock error or a sale of beer that we can't fulfill as we have no stock. 10 ------------------------------------ 11 Alice adds 2 beers in her cart. 12 Sleeping 1 second... 13 Trying to update beer stock : -2 beers. 14 #################################### 15 16 Database state: 17 Product{id='beer', stock=3, price=3} 18 Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]} 19 20 Sleeping 1 second... 21 ######### WITH TRANSACTION ######### 22 Alice wants 2 extra beers. 23 Now we can update the 2 collections simultaneously. 24 The 2 operations only happen when the transaction is committed. 25 ------------------------------------ 26 Updating Alice cart : adding 2 beers. 27 Sleeping 1 second... 28 Trying to update beer stock : -2 beers. 29 #################################### 30 31 Database state: 32 Product{id='beer', stock=1, price=3} 33 Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]} 34 35 Sleeping 1 second... 36 ######### WITH TRANSACTION ######### 37 Alice wants 2 extra beers. 38 This time we do not have enough beers in stock so the transaction will rollback. 39 ------------------------------------ 40 Updating Alice cart : adding 2 beers. 41 Sleeping 1 second... 42 Trying to update beer stock : -2 beers. 43 ######## MongoException ######## 44 ##### STOCK CANNOT BE NEGATIVE ##### 45 ####### ROLLBACK TRANSACTION ####### 46 #################################### 47 48 Database state: 49 Product{id='beer', stock=1, price=3} 50 Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}
Thanks for taking the time to read my post. I hope you found it useful and interesting.
As a reminder, all the code is
available on the GitHub repository
for you to experiment.
If you're seeking an easy way to begin with MongoDB, you can achieve that in just five clicks using
our MongoDB Atlas cloud database service.