Docs Menu
Docs Home
/
MongoDB Manual

Transactions

On this page

  • Transactions API
  • Transactions and Atomicity
  • Transactions and Operations
  • Transactions and Sessions
  • Read Concern/Write Concern/Read Preference
  • General Information
  • Additional Transactions Topics

In MongoDB, an operation on a single document is atomic. Because you can use embedded documents and arrays to capture relationships between data in a single document structure instead of normalizing across multiple documents and collections, this single-document atomicity obviates the need for distributed transactions for many practical use cases.

For situations that require atomicity of reads and writes to multiple documents (in a single or multiple collections), MongoDB supports distributed transactions. With distributed transactions, transactions can be used across multiple operations, collections, databases, documents, and shards.


➤ Use the Select your language drop-down menu in the upper-right to set the language of the following example.


This example highlights the key components of the transactions API.

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
// Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be
// needed. The suggested Atlas connection string includes majority write concern by default.
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for "TransientTransactionError" or "UnknownTransactionCommitResult" commit errors.

Important

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances.
For example:
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations.
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for "TransientTransactionError" or "UnknownTransactionCommitResult" commit errors.

Important

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

This example uses the core API. Because the core API does not incorporate retry logic for "TransientTransactionError" or "UnknownTransactionCommitResult" commit errors, the example includes explicit logic to retry the transaction for these errors:

Important

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

Important

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for "TransientTransactionError" or "UnknownTransactionCommitResult" commit errors.

Important

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

Important

// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?
// replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g.
// let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
let client = Client::with_uri_str(uri).await?;
// Prereq: Create collections. CRUD operations in transactions must be on existing collections.
client
.database("mydb1")
.collection::<Document>("foo")
.insert_one(doc! { "abc": 0})
.await?;
client
.database("mydb2")
.collection::<Document>("bar")
.insert_one(doc! { "xyz": 0})
.await?;
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transaction.
async fn callback(session: &mut ClientSession) -> Result<()> {
let collection_one = session
.client()
.database("mydb1")
.collection::<Document>("foo");
let collection_two = session
.client()
.database("mydb2")
.collection::<Document>("bar");
// Important: You must pass the session to the operations.
collection_one
.insert_one(doc! { "abc": 1 })
.session(&mut *session)
.await?;
collection_two
.insert_one(doc! { "xyz": 999 })
.session(session)
.await?;
Ok(())
}
// Step 2: Start a client session.
let mut session = client.start_session().await?;
// Step 3: Use and_run to start a transaction, execute the callback, and commit (or
// abort on error).
session
.start_transaction()
.and_run((), |session, _| callback(session).boxed())
.await?;

This example uses the core API. Because the core API does not incorporate retry logic for "TransientTransactionError" or "UnknownTransactionCommitResult" commit errors, the example includes explicit logic to retry the transaction for these errors:

Important

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

Tip

See also:

For an example in mongo shell, see mongo Shell Example.

For situations that require atomicity of reads and writes to multiple documents (in a single or multiple collections), MongoDB supports distributed transactions, including transactions on replica sets and sharded clusters.

Distributed transactions are atomic:

  • Transactions either apply all data changes or roll back the changes.

  • If a transaction commits, all data changes made in the transaction are saved and are visible outside of the transaction.

    Until a transaction commits, the data changes made in the transaction are not visible outside the transaction.

    However, when a transaction writes to multiple shards, not all outside read operations need to wait for the result of the committed transaction to be visible across the shards. For example, if a transaction is committed and write 1 is visible on shard A but write 2 is not yet visible on shard B, an outside read at read concern "local" can read the results of write 1 without seeing write 2.

  • When a transaction aborts, all data changes made in the transaction are discarded without ever becoming visible. For example, if any operation in the transaction fails, the transaction aborts and all data changes made in the transaction are discarded without ever becoming visible.

Important

In most cases, a distributed transaction incurs a greater performance cost over single document writes, and the availability of distributed transactions should not be a replacement for effective schema design. For many scenarios, the denormalized data model (embedded documents and arrays) will continue to be optimal for your data and use cases. That is, for many scenarios, modeling your data appropriately will minimize the need for distributed transactions.

For additional transactions usage considerations (such as runtime limit and oplog size limit), see also Production Considerations.

Distributed transactions can be used across multiple operations, collections, databases, documents, and shards.

For transactions:

  • You can create collections and indexes in transactions. For details, see Create Collections and Indexes in a Transaction

  • The collections used in a transaction can be in different databases.

    Note

    You cannot create new collections in cross-shard write transactions. For example, if you write to an existing collection in one shard and implicitly create a collection in a different shard, MongoDB cannot perform both operations in the same transaction.

  • You cannot write to capped collections.

  • You cannot read/write to collections in the config, admin, or local databases.

  • You cannot write to system.* collections.

  • You cannot return the supported operation's query plan using explain or similar commands.

  • For cursors created outside of a transaction, you cannot call getMore inside the transaction.

  • For cursors created in a transaction, you cannot call getMore outside the transaction.

For a list of operations not supported in transactions, see Restricted Operations.

Tip

When creating or dropping a collection immediately before starting a transaction, if the collection is accessed within the transaction, issue the create or drop operation with write concern "majority" to ensure that the transaction can acquire the required locks.

You can perform the following operations in a distributed transaction if the transaction is not a cross-shard write transaction:

  • Create collections.

  • Create indexes on new empty collections created earlier in the same transaction.

When creating a collection inside a transaction:

When creating an index inside a transaction [1], the index to create must be on either:

  • a non-existent collection. The collection is created as part of the operation.

  • a new empty collection created earlier in the same transaction.

[1] You can also run db.collection.createIndex() and db.collection.createIndexes() on existing indexes to check for existence. These operations return successfully without creating the index.
  • You cannot create new collections in cross-shard write transactions. For example, if you write to an existing collection in one shard and implicitly create a collection in a different shard, MongoDB cannot perform both operations in the same transaction.

  • For explicit creation of a collection or an index inside a transaction, the transaction read concern level must be "local".

    To explicitly create collections and indexes, use the following commands and methods:

    Command
    Method
  • The parameter shouldMultiDocTxnCreateCollectionAndIndexes must be true (the default). When setting the parameter for a sharded cluster, set the parameter on all shards.

To perform a count operation within a transaction, use the $count aggregation stage or the $group (with a $sum expression) aggregation stage.

MongoDB drivers provide a collection-level API countDocuments(filter, options) as a helper method that uses the $group with a $sum expression to perform a count. The count() API is deprecated.

mongo shell provides the db.collection.countDocuments() helper method that uses the $group with a $sum expression to perform a count.

To perform a distinct operation within a transaction:

  • For unsharded collections, you can use the db.collection.distinct() method/the distinct command as well as the aggregation pipeline with the $group stage.

  • For sharded collections, you cannot use the db.collection.distinct() method or the distinct command.

    To find the distinct values for a sharded collection, use the aggregation pipeline with the $group stage instead. For example:

    • Instead of db.coll.distinct("x"), use

      db.coll.aggregate([
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])
    • Instead of db.coll.distinct("x", { status: "A" }), use:

      db.coll.aggregate([
      { $match: { status: "A" } },
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])

    The pipeline returns a cursor to a document:

    { "distinctValues" : [ 2, 3, 1 ] }

    Iterate the cursor to access the results document.

Informational commands, such as hello, buildInfo, connectionStatus (and their helper methods) are allowed in transactions; however, they cannot be the first operation in the transaction.

Changed in version 4.4.

The following operations are not allowed in transactions:

  • Transactions are associated with a session.

  • You can have at most one open transaction at a time for a session.

  • When using the drivers, each operation in the transaction must be associated with the session. Refer to your driver specific documentation for details.

  • If a session ends and it has an open transaction, the transaction aborts.

Operations in a transaction use the transaction-level read preference.

Using the drivers, you can set the transaction-level read preference at the transaction start:

  • If the transaction-level read preference is unset, the transaction uses the session-level read preference.

  • If transaction-level and the session-level read preference are unset, the transaction uses the client-level read preference. By default, the client-level read preference is primary.

Distributed transactions that contain read operations must use read preference primary. All operations in a given transaction must route to the same member.

Operations in a transaction use the transaction-level read concern. This means a read concern set at the collection and database level is ignored inside the transaction.

You can set the transaction-level read concern at the transaction start.

  • If the transaction-level read concern is unset, the transaction-level read concern defaults to the session-level read concern.

  • If transaction-level and the session-level read concern are unset, the transaction-level read concern defaults to the client-level read concern. By default, the client-level read concern is "local" for reads on the primary. See also:

Transactions support the following read concern levels:

  • Read concern "local" returns the most recent data available from the node but can be rolled back.

  • For transactions on sharded cluster, "local" read concern cannot guarantee that the data is from the same snapshot view across the shards. If snapshot isolation is required, use "snapshot" read concern.

  • You can create collections and indexes inside a transaction. If explicitly creating a collection or an index, the transaction must use read concern "local". If you implicitly create a collection, you can use any of the read concerns available for transactions.

  • If the transaction commits with write concern "majority", read concern "majority" returns data that has been acknowledged by a majority of the replica set members and can't be rolled back. Otherwise, read concern "majority" provides no guarantees that read operations read majority-committed data.

  • For transactions on sharded cluster, read concern "majority" can't guarantee that the data is from the same snapshot view across the shards. If snapshot isolation is required, use read concern "snapshot".

  • Read concern "snapshot" returns data from a snapshot of majority committed data if the transaction commits with write concern "majority".

  • If the transaction does not use write concern "majority" for the commit, the "snapshot" read concern provides no guarantee that read operations used a snapshot of majority-committed data.

  • For transactions on sharded clusters, the "snapshot" view of the data is synchronized across shards.

Transactions use the transaction-level write concern to commit the write operations. Write operations inside transactions must be run without an explicit write concern specification and use the default write concern. At commit time, the writes committed using the transaction-level write concern.

Tip

Don't explicitly set the write concern for the individual write operations inside a transaction. Setting write concerns for the individual write operations inside a transaction returns an error.

You can set the transaction-level write concern at the transaction start:

  • If the transaction-level write concern is unset, the transaction-level write concern defaults to the session-level write concern for the commit.

  • If the transaction-level write concern and the session-level write concern are unset, transaction-level write concern defaults to the client-level write concern. By default, client-level write concern is w: 1. See also Default MongoDB Read Concerns/Write Concerns.

Transactions support all write concern w values, including:

  • Write concern w: 1 returns acknowledgement after the commit has been applied to the primary.

    Important

    When you commit with w: 1, your transaction can be rolled back if there is a failover.

  • When you commit with w: 1 write concern, transaction-level "majority" read concern provides no guarantees that read operations in the transaction read majority-committed data.

  • When you commit with w: 1 write concern, transaction-level "snapshot" read concern provides no guarantee that read operations in the transaction used a snapshot of majority-committed data.

  • Write concern w: "majority" returns acknowledgement after the commit has been applied to a majority of voting members.

  • When you commit with w: "majority" write concern, transaction-level "majority" read concern guarantees that operations have read majority-committed data. For transactions on sharded clusters, this view of the majority-committed data is not synchronized across shards.

  • When you commit with w: "majority" write concern, transaction-level "snapshot" read concern guarantees that operations have read from a synchronized snapshot of majority-committed data.

Note

Regardless of the write concern specified for the transaction, the commit operation for a sharded cluster transaction includes some parts that use {w: "majority", j: true} write concern.

If you specify a "majority" write concern for a multi-document transaction and the transaction fails to replicate to the calculated majority of replica set members, then the transaction may not immediately roll back on replica set members. The replica set will be eventually consistent. A transaction is always applied or rolled back on all replica set members.

Regardless of the write concern specified for the transaction, the driver applies w: "majority" as the write concern when retrying commitTransaction.

The following sections describe additional considerations for transactions.

For transactions in production environments, see Production Considerations. In addition, for sharded clusters, see Production Considerations (Sharded Clusters).

Transactions whose write operations span multiple shards will error and abort if any transaction operation reads from or writes to a shard that contains an arbiter.

See also Disabled Read Concern Majority for transaction restrictions on shards that have disabled read concern majority.

A 3-member PSA (Primary-Secondary-Arbiter) replica set or a sharded cluster with 3-member PSA shards may have disabled read concern majority (--enableMajorityReadConcern false or replication.enableMajorityReadConcern: false)

On sharded clusters,
  • If a transaction involves a shard that has disabled read concern "majority", you cannot use read concern "snapshot" for the transaction. You can only use read concern "local" or "majority" for the transaction. If you use read concern "snapshot", the transaction errors and aborts.

    readConcern level 'snapshot' is not supported in sharded clusters when enableMajorityReadConcern=false.
  • Transactions whose write operations span multiple shards will error and abort if any of the transaction's read or write operations involves a shard that has disabled read concern "majority".

On a replica set, you can specify read concern "local" or "majority" or "snapshot" even if the replica set has disabled read concern "majority".

However, if you are planning to transition to a sharded cluster with disabled read concern majority shards, avoid using read concern "snapshot".

Tip

To check if read concern "majority" is disabled, You can run db.serverStatus() on the mongod instances and check the storageEngine.supportsCommittedReads field. If false, read concern "majority" is disabled.

For more information, see 3-Member Primary-Secondary-Arbiter Architecture and Three Member Primary-Secondary-Arbiter Shards.

You cannot run transactions on a sharded cluster that has a shard with writeConcernMajorityJournalDefault set to false (such as a shard with a voting member that uses the in-memory storage engine).

Note

Regardless of the write concern specified for the transaction, the commit operation for a sharded cluster transaction includes some parts that use {w: "majority", j: true} write concern.

To obtain transaction status and metrics, use the following methods:

Source
Returns
Returns transactions metrics.
$currentOp aggregation pipeline

Returns:

Returns:

mongod and mongos log messages
Includes information on slow transactions (which are transactions that exceed the operationProfiling.slowOpThresholdMs threshold) in the TXN log component.

To use transactions, the featureCompatibilityVersion for all members of the deployment must be at least:

Deployment
Minimum featureCompatibilityVersion
Replica Set
4.0
Sharded Cluster
4.2

To check the fCV for a member, connect to the member and run the following command:

db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )

For more information, see the setFeatureCompatibilityVersion reference page.

distributed transactions are supported on replica sets and sharded clusters where:

  • the primary uses the WiredTiger storage engine, and

  • the secondary members use either the WiredTiger storage engine or the in-memory storage engines.

Note

You cannot run transactions on a sharded cluster that has a shard with writeConcernMajorityJournalDefault set to false, such as a shard with a voting member that uses the in-memory storage engine.

Back

update