Menu Docs
Página inicial do Docs
/
Manual do MongoDB

Transações

Nesta página

  • API de transações
  • Transações e Atomicidade
  • Transações e Operações
  • Transações e Sessões
  • Read Concern/Write Concern/Preferência de leitura
  • Informações gerais
  • Tópicos adicionais de transações

No MongoDB, uma operação em um único documento é atômica. Como é possível usar documentos e arrays incorporados para capturar relacionamentos entre dados em uma única estrutura de documento, em vez de normalizar vários documentos e collections, essa atomicidade de documento único evita a necessidade de transações distribuídas para muitos casos de uso práticos.

Para situações que exigem atomicidade de leituras e gravações em vários documentos (em uma collection única ou múltipla), o MongoDB suporta transações distribuídas. Com transações distribuídas, as transações podem ser usadas em diversas operações, collections, bancos de dados, documentos e shards.

As informações nesta página se aplicam a sistemas hospedados nos seguintes ambientes:

  • MongoDB Atlas: o serviço totalmente gerenciado para implantações MongoDB na nuvem

  • MongoDB Enterprise: a versão autogerenciada e baseada em assinatura do MongoDB

  • MongoDB Community: uma versão com código disponível, de uso gratuito e autogerenciada do MongoDB


➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem do exemplo a seguir.


Este exemplo destaca os principais componentes da API de transações.

O exemplo usa a nova API de chamada de resposta para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para TransientTransactionError ou UnknownTransactionCommitResult erros de confirmação.

Importante

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

O exemplo usa a nova Callback API para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para erros de confirmação TransientTransactionError ou UnknownTransactionCommitResult .

Importante

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
read_concern rc_local{};
rc_local.acknowledge_level(read_concern::level::k_local);
read_preference rp_primary{};
rp_primary.mode(read_preference::read_mode::k_primary);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
opts.read_concern(rc_local);
opts.read_preference(rp_primary);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

O exemplo usa a nova Callback API para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para erros de confirmação TransientTransactionError ou UnknownTransactionCommitResult .

Importante

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

O exemplo usa a nova Callback API de resposta para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova Callback API de resposta incorpora uma lógica de repetição para erros de confirmação "TransientTransactionError" ou "UnknownTransactionCommitResult" .

Importante

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

O exemplo usa a nova Callback API para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para erros de confirmação TransientTransactionError ou UnknownTransactionCommitResult .

Importante

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances.
For example:
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations.
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

O exemplo usa a nova Callback API de resposta para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova Callback API de resposta incorpora uma lógica de repetição para erros de confirmação "TransientTransactionError" ou "UnknownTransactionCommitResult" .

Importante

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

O exemplo usa a nova Callback API para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para erros de confirmação TransientTransactionError ou UnknownTransactionCommitResult .

Importante

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

Este exemplo usa a API principal. Como a API principal não incorpora lógica de repetição para erros de confirmação "TransientTransactionError" ou "UnknownTransactionCommitResult" , o exemplo inclui lógica explícita para tentar novamente a transação para esses erros:

Importante

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

O exemplo usa a nova Callback API para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para erros de confirmação TransientTransactionError ou UnknownTransactionCommitResult .

Importante

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

O exemplo usa a nova Callback API para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova API de chamada de resposta também incorpora uma lógica de repetição para erros de confirmação TransientTransactionError ou UnknownTransactionCommitResult .

Importante

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

O exemplo usa a nova Callback API de resposta para trabalhar com transações, que inicia uma transação, executa as operações especificadas e confirma (ou anula em caso de erro). A nova Callback API de resposta incorpora uma lógica de repetição para erros de confirmação "TransientTransactionError" ou "UnknownTransactionCommitResult" .

Importante

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

Importante

// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?
// replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g.
// let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
let client = Client::with_uri_str(uri).await?;
// Prereq: Create collections. CRUD operations in transactions must be on existing collections.
client
.database("mydb1")
.collection::<Document>("foo")
.insert_one(doc! { "abc": 0})
.await?;
client
.database("mydb2")
.collection::<Document>("bar")
.insert_one(doc! { "xyz": 0})
.await?;
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transaction.
async fn callback(session: &mut ClientSession) -> Result<()> {
let collection_one = session
.client()
.database("mydb1")
.collection::<Document>("foo");
let collection_two = session
.client()
.database("mydb2")
.collection::<Document>("bar");
// Important: You must pass the session to the operations.
collection_one
.insert_one(doc! { "abc": 1 })
.session(&mut *session)
.await?;
collection_two
.insert_one(doc! { "xyz": 999 })
.session(session)
.await?;
Ok(())
}
// Step 2: Start a client session.
let mut session = client.start_session().await?;
// Step 3: Use and_run to start a transaction, execute the callback, and commit (or
// abort on error).
session
.start_transaction()
.and_run((), |session, _| callback(session).boxed())
.await?;

Este exemplo usa a API principal. Como a API principal não incorpora lógica de repetição para erros de confirmação "TransientTransactionError" ou "UnknownTransactionCommitResult" , o exemplo inclui lógica explícita para tentar novamente a transação para esses erros:

Importante

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

Dica

Veja também:

Para obter um exemplo em mongosh, consulte mongosh Exemplo.

Para situações que exigem atomicidade de leituras e escritos em vários documentos (em uma única coleção ou várias coleções), o MongoDB suporta transações distribuídas, incluindo transações em conjuntos de réplicas e clusters fragmentados.

As transações distribuídas são atômicas:

  • As transações aplicam todas as alterações de dados ou revertem as alterações.

  • Se uma transação for confirmada, todas as alterações de dados feitas na transação serão salvas e ficarão visíveis fora da transação.

    Até que uma transação seja confirmada, as alterações de dados feitas na transação não serão visíveis fora da transação.

    No entanto, quando uma transação é gravada em vários fragmentos, nem todas as operações de leitura externas precisam esperar que o resultado da transação confirmada fique visível nos fragmentos. Por exemplo, se uma transação estiver comprometida e escrever 1 estiver visível no fragmento A, mas escrever 2 ainda não estiver visível no fragmento B, uma leitura externa em questão de leitura "local" poderá ler os resultados da escrita 1 sem ver a escrita 2.

  • Quando uma transação é abortada, todas as alterações de dados feitas na transação são descartadas sem nunca se tornarem visíveis. Por exemplo, se operação na transação falhar, a transação será abortada e todas as alterações de dados feitas na transação serão descartadas sem que se tornem visíveis.

Importante

Na maioria dos casos, uma transação distribuída incorre em um custo de desempenho maior do que as gravações de um único documento, e a disponibilidade de transações distribuídas não deve substituir o design eficaz do esquema. Em muitos cenários, o modelo de dados desnormalizado (documentos e arrays incorporados) continuará a ser ideal para seus dados e casos de uso. Ou seja, para muitos cenários, modelar seus dados adequadamente minimizará a necessidade de transações distribuídas.

Para considerações adicionais sobre o uso de transações (como limite de tempo de execução e limite de tamanho do oplog), consulte também Considerações de produção.

Transações distribuídas podem ser usadas em várias operações, collections, bancos de dados, documentos e shards.

Para transações:

  • Você pode criar coleções e índices em transações. Para obter detalhes, consulte Crie coleções e índices em uma transação

  • A coletas utilizadas em uma transação podem estar em diferentes bancos de dados.

    Observação

    Você não pode criar uma nova coleta em transações de gravação entre fragmentos. Por exemplo, se você gravar em uma coleta existente em um fragmento e criar implicitamente uma coleta em um fragmento diferente, o MongoDB não poderá executar ambas as operações na mesma transação.

  • Você não pode gravar em coletas limitadas .

  • Não é possível usar read concern "snapshot" ao ler em uma capped collection. (A partir do MongoDB 5.0)

  • Você não pode ler/gravar em coletas nos bancos de dados config, admin ou local.

  • Você não pode gravar na coleção system.*.

  • Não é possível retornar o plano de query da operação compatível utilizando explain ou comandos semelhantes.

  • Para cursores criados fora de uma transação, você não pode chamar getMore dentro da transação.

  • Para cursores criados em uma transação, não é possível chamar getMore fora da transação.

Para obter uma lista de operações sem suporte em transações, consulte Operações restritas.

Dica

Ao criar ou descartar uma coleta imediatamente antes de iniciar uma transação, se a coleta for acessada dentro da transação, emita a operação de criação ou entrega em espera com write concern "majority" para garantir que a transação possa adquirir os bloqueios necessários.

É possível executar as seguintes operações em uma transação distribuída quando a transação não é uma transação de gravação entre fragmentos:

  • Criar coleções

  • Crie índices na nova coleta vazia criada anteriormente na mesma transação.

Ao criar uma coleta dentro de uma transação:

Ao criar um índice dentro de uma transação [1], o índice a ser criado deve estar em:

  • uma coleção inexistente. A coleção é criada como parte da operação.

  • uma nova coleta vazia criada anteriormente na mesma transação.

[1] Você também pode executar db.collection.createIndex() e db.collection.createIndexes() em índices existentes para verificar a existência. Estas operações retornam com sucesso sem criar o índice.
  • Você não pode criar uma nova coleta em transações de gravação entre fragmentos. Por exemplo, se você gravar em uma coleta existente em um fragmento e criar implicitamente uma coleta em um fragmento diferente, o MongoDB não poderá executar ambas as operações na mesma transação.

  • Para a criação explícita de uma collection ou de um índice em uma transação, o nível de read concern da transação deve ser "local".

    Para criar coleções e índices explicitamente, use os seguintes comandos e métodos:

    Comando
    Método

Dica

Veja também:

Para realizar uma operação de contagem dentro de uma transação, use o estágio de agregação $count ou o estágio de agregação $group (com uma expressão $sum de agregação.

Os drivers do MongoDB fornecem uma API de collection countDocuments(filter, options) como um método auxiliar que usa o $group com uma expressão $sum para executar uma contagem.

O mongosh fornece o método auxiliar db.collection.countDocuments(), que usa o $group com a expressão $sum para executar uma contagem.

Para realizar uma operação distinta dentro de uma transação:

  • Para coleções não fragmentadas, você pode usar o db.collection.distinct() método /o distinct comando , bem como o pipeline de agregação com o estágio $group .

  • Para coleções fragmentadas, você não pode utilizar o método db.collection.distinct() ou o comando distinct.

    Para localizar os valores distintos para uma coleção fragmentada, use o pipeline de agregação com o estágio $group. Por exemplo:

    • Em vez de db.coll.distinct("x"), use

      db.coll.aggregate([
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])
    • Em vez de db.coll.distinct("x", { status: "A" }), use:

      db.coll.aggregate([
      { $match: { status: "A" } },
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])

    O pipeline retorna um cursor para um documento:

    { "distinctValues" : [ 2, 3, 1 ] }

    Itere o cursor para acessar o documento de resultados.

Comandos informativos, como hello, buildInfo, connectionStatus (e seus métodos de ajuda) são permitidos em transações; no entanto, eles não podem ser a primeira operação na transação.

As seguintes operações não são permitidas nas transações:

  • As transações são associadas a uma sessão.

  • Você pode ter no máximo uma transação aberta por vez em uma sessão.

  • Ao utilizar os drivers, cada operação da transação deve ser associada à sessão. Consulte a documentação específica do seu driver para obter detalhes.

  • Se uma sessão terminar e houver uma transação aberta, a transação será abortada.

As operações em uma transação usam a preferência de leituraem nível de transação.

Usando os drivers, você pode definir a preferência de leitura no nível da transação no início da transação:

  • Se a preferência de leitura no nível da transação não estiver definida, a transação usará a preferência de leitura no nível da sessão.

  • Se a read preference da transação e da sessão não estiver definida, a transação usará a read preference do cliente. Por padrão, a read preference do cliente é primary.

As transações distribuídas que contêm operações de leitura devem utilizar a preferência de leitura primary. Todas as operações de uma determinada transação devem ser roteadas para o mesmo membro.

As operações em uma transação usam preocupação de leitura no nível da transação. Isso significa que uma preocupação de leitura definida no nível da coleção e do banco de dados é ignorada na transação.

Você pode definir a preocupação de leitura no nível da transação no início da transação.

  • Se a preocupação de leitura no nível da transação não estiver definida, a preocupação de leitura no nível da transação será padronizada para a preocupação de leitura no nível da sessão.

  • Se a preocupação de leitura no nível da transação e no nível da sessão não estiverem definidos, a preocupação de leitura no nível da transação será padronizada para a preocupação de leitura no nível do cliente. Por padrão, a preocupação de leitura no nível do cliente é "local" para leituras no primário. Veja também:

As transações suportam os seguintes níveis de preocupação de leitura:

  • A preocupação de leitura "local" retorna os dados mais recentes disponíveis no nó, mas pode ser revertida.

  • Em um conjunto de réplicas, mesmo que uma transação use read concern local, você poderá observar um isolamento de leitura mais forte quando a operação ler a partir de um snapshot no momento em que a transação foi aberta.

  • Nas transações em cluster fragmentado, a preocupação de leitura "local" não garante que os dados sejam do mesmo snapshot dos fragmentos. Se o isolamento de snapshot for necessário, use a preocupação de leitura "snapshot".

  • Você pode criar coleções e índices dentro de uma transação. Se estiver criando explicitamente um coleção ou um índice, a transação deverá usar a preocupação de leitura "local". Se você criar implicitamente uma coleção, poderá usar qualquer uma das preocupações de leitura disponíveis para transações.

  • Se a transação for confirmada com a preocupação de gravação "maioria", a preocupação de leitura "majority" retornará dados que foram reconhecidos pela maioria dos nós do conjunto de réplicas e não podem ser revertidos. Caso contrário, a preocupação de leitura "majority" não garante que as operações de leitura lerão dados confirmados pela maioria.

  • Nas transações em cluster fragmentado, a preocupação de leitura "majority" não garante que os dados sejam do mesmo snapshot dos fragmentos. Se o isolamento de snapshot for necessário, use a preocupação de leitura "snapshot".

  • Preocupação de leitura "snapshot" retorna dados de um snapshot de dados comprometidos majoritariamente se a transação for confirmada com a preocupação de gravação na "maioria".

  • Se a transação não usar a preocupação de gravação "maioria" para a confirmação, a preocupação de leitura "snapshot" não fornecerá nenhuma garantia de que as operações de leitura usaram um snapshot de dados confirmados pela maioria.

  • Para transações em clusters fragmentados, a visualização "snapshot" dos dados é sincronizada entre fragmentos.

As transações usam a preocupação de gravação em nível de transação para confirmar as operações de gravação As operações de gravação dentro das transações devem ser executadas sem uma especificação explícita de preocupação de gravação e usar a preocupação de gravação padrão. No momento de confirmação, as gravações são confirmadas usando a preocupação de gravação no nível da transação.

Dica

Não defina explicitamente a preocupação de gravação para as operações individuais de gravação dentro de uma transação. Definir preocupações de gravação para operações de gravação individuais dentro de uma transação retorna um erro.

Você pode definir a write concern no nível da transação no início da transação:

  • Se a preocupação de gravação no nível da transação não estiver definida, a preocupação de gravação no nível da transação será padronizada para a preocupação de gravação no nível da sessão para o commit.

  • Se a preocupação de gravação no nível da transação e a preocupação de gravação no nível da sessão não estiverem definidas, a preocupação de gravação no nível da transação será padronizada para a preocupação de gravação no nível do cliente de:

As transações suportam todos os valores w de preocupação de gravação, incluindo:

  • Write concern w: 1 retorna a confirmação após a confirmação ter sido aplicada ao principal.

    Importante

    Quando você confirma comw: 1, sua transação pode ser revertida se houver um failover.

  • Quando você confirma comw: 1 write concern, a preocupação de leitura de "majority" no nível da transação não fornece garantias de que as operações de leitura na transação leiam os dados confirmados pela maioria.

  • Quando você se compromete com a write concern w: 1, read concern "snapshot" no nível da transação não garante que as operações de leitura na transação usaram um snapshot dos dados confirmados pela maioria.

  • A preocupação de gravação w: "majority" retorna o reconhecimento após a aplicação da confirmação à maioria dos nós votantes.

  • Quando você se compromete com a write concern w: "majority", a read concern "majority" no nível da transação garante que as operações tenham lido os dados confirmados pela maioria. Para transações em cluster fragmentado, esta visualização dos dados comprometidos pela maioria não é sincronizada entre os fragmentos.

  • Quando você se compromete comw: "majority" write concern, a read concern de "snapshot" no nível de transação garante que as operações tenham lido a partir de um snapshot sincronizado de dados confirmados pela maioria.

Observação

Independentemente da preocupação de gravação especificada para a transação, a operação de confirmação para um cluster fragmentado inclui algumas partes que usam a preocupação de gravação {w: "majority", j: true}.

O parâmetro do servidor coordinateCommitReturnImmediatelyAfterPersistingDecision controla quando as decisões feitas na transação são devolvidas ao cliente.

O parâmetro foi introduzido no MongDB 5.0 com um valor padrão de true. No MongoDB 5.0.10, o valor padrão muda para false.

Quando coordinateCommitReturnImmediatelyAfterPersistingDecision é false, o coordenador de transações do fragmento aguarda que todos os membros confirmem a confirmação de uma transação com vários documentos antes de retornar a decisão de confirmação para o cliente.

Se você especificar uma write concern "majority" para uma transação com vários documentos e a transação não for replicada para a maioria calculada dos membros do conjunto de réplicas , a transação poderá não ser revertida imediatamente nos membros do conjunto de réplicas. O conjunto de réplicas será eventualmente consistente. Uma transação é sempre aplicada ou revertida em todos os membros do conjunto de réplicas.

Independentemente da preocupação de gravação especificada para a transação, o driver aplica w: "majority" como a preocupação de gravação ao tentar commitTransaction novamente.

As seções a seguir descrevem mais considerações para transações.

Para transações em ambientes de produção, consulte Considerações de produção. Além disso, para clusters fragmentados, consulte Considerações de produção (clusters fragmentados).

Você não pode alterar uma chave de shard usando uma transação se o conjunto de réplicas tiver um arbiter. Os árbitros não podem participar das operações de dados necessárias para transações com vários shards.

As transações cujas operações de gravação abrangem vários fragmentos gerarão o erro e serão abortadas se qualquer operação de transação ler ou gravar em um fragmento que contenha um árbitro.

Não é possível executar transações em um cluster fragmentado que tenha um shard com writeConcernMajorityJournalDefault definido como false (como um shard com um membro votante que usa o mecanismo de armazenamento na memória).

Observação

Independentemente da preocupação de gravação especificada para a transação, a operação de confirmação para um cluster fragmentado inclui algumas partes que usam a preocupação de gravação {w: "majority", j: true}.

Para obter o status e as métricas da transação, use os seguintes métodos:

Fonte
Devoluções

Retorna métricas de transações.

Observação

Alguns campos de resposta serverStatus não são retornados em clusters do MongoDB Atlas M0/M2/M5 . Para mais informações, consulte Comandos limitados na documentação do MongoDB Atlas.

$currentOp aggregation pipeline

Retorna:

Retorna:

mongod e mongos mensagens de log
Inclui informações sobre transações lentas (que são transações que excedem o limite deoperationProfiling.slowOpThresholdMs) no componente de log TXN.

Para usar transações, a featureCompatibilityVersion para todos os membros da implantação deve ser pelo menos:

Implantação
Mínimo featureCompatibilityVersion
Conjunto de réplicas
4.0
Cluster fragmentado
4.2

Para verificar o fCV de um membro, conecte-se ao membro e execute o comando:

db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )

Para mais informações, consulte a página de referência do setFeatureCompatibilityVersion.

As transações distribuídas são compatíveis com conjuntos de réplicas e clusters fragmentados onde:

  • o primário usa o mecanismo de armazenamento WiredTiger e

  • os membros secundários usam o mecanismo de armazenamento WiredTiger ou os mecanismos de armazenamento na memória .

Observação

Você não pode executar transação em um cluster fragmentado que tenha um shard com writeConcernMajorityJournalDefault definido como false, como um shard com um nó votante que usa o storage engine na memória.

Voltar

update

Próximo

API de drivers