Docs 菜单
Docs 主页
/
MongoDB Manual
/

驱动程序 API

在此页面上

  • Callback API 与 Core API
  • 回调 API
  • Core API
  • 驱动程序版本
  • 事务错误处理
  • 更多信息

Callback API:

核心 API:

回调 API 包含以下逻辑:

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误时不会重试事务。


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


重要

  • 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。

  • 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。

  • ACID 事务中的操作使用事务级读关注(read concern)事务级写关注(write concern)事务级读取偏好(read preference)。

  • 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
// Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be
// needed. The suggested Atlas connection string includes majority write concern by default.
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

重要

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances; e.g.
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations..
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

注意

对于Perl驾驶员,请参阅Core API使用示例。

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

重要

该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑。

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

注意

对于Scala驾驶员,请参阅Core API使用示例。

核心事务 API 不包含标记为以下错误的重试逻辑:


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


以下示例包含在出现暂时性错误时重试事务的逻辑,以及在出现未知提交错误时重试提交的逻辑:

/* takes a session, an out-param for server reply, and out-param for error. */
typedef bool (*txn_func_t) (mongoc_client_session_t *, bson_t *, bson_error_t *);
/* runs transactions with retry logic */
bool
run_transaction_with_retry (txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* perform transaction */
r = txn_func (cs, &reply, error);
if (r) {
/* success */
bson_destroy (&reply);
return true;
}
MONGOC_WARNING ("Transaction aborted: %s", error->message);
if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
/* on transient error, retry the whole transaction */
MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
bson_destroy (&reply);
} else {
/* non-transient error */
break;
}
}
bson_destroy (&reply);
return false;
}
/* commit transactions with retry logic */
bool
commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* commit uses write concern set at transaction start, see
* mongoc_transaction_opts_set_write_concern */
r = mongoc_client_session_commit_transaction (cs, &reply, error);
if (r) {
MONGOC_DEBUG ("Transaction committed");
break;
}
if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
bson_destroy (&reply);
} else {
/* commit failed, cannot retry */
break;
}
}
bson_destroy (&reply);
return r;
}
/* updates two collections in a transaction and calls commit_with_retry */
bool
update_employee_info (mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error)
{
mongoc_client_t *client;
mongoc_collection_t *employees;
mongoc_collection_t *events;
mongoc_read_concern_t *rc;
mongoc_write_concern_t *wc;
mongoc_transaction_opt_t *txn_opts;
bson_t opts = BSON_INITIALIZER;
bson_t *filter = NULL;
bson_t *update = NULL;
bson_t *event = NULL;
bool r;
bson_init (reply);
client = mongoc_client_session_get_client (cs);
employees = mongoc_client_get_collection (client, "hr", "employees");
events = mongoc_client_get_collection (client, "reporting", "events");
rc = mongoc_read_concern_new ();
mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_w (
wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_read_concern (txn_opts, rc);
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
r = mongoc_client_session_start_transaction (cs, txn_opts, error);
if (!r) {
goto done;
}
r = mongoc_client_session_append (cs, &opts, error);
if (!r) {
goto done;
}
filter = BCON_NEW ("employee", BCON_INT32 (3));
update = BCON_NEW ("$set", "{", "status", "Inactive", "}");
/* mongoc_collection_update_one will reinitialize reply */
bson_destroy (reply);
r = mongoc_collection_update_one (employees, filter, update, &opts, reply, error);
if (!r) {
goto abort;
}
event = BCON_NEW ("employee", BCON_INT32 (3));
BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}");
bson_destroy (reply);
r = mongoc_collection_insert_one (events, event, &opts, reply, error);
if (!r) {
goto abort;
}
r = commit_with_retry (cs, error);
abort:
if (!r) {
MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message);
mongoc_client_session_abort_transaction (cs, NULL);
}
done:
mongoc_collection_destroy (employees);
mongoc_collection_destroy (events);
mongoc_read_concern_destroy (rc);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
bson_destroy (&opts);
bson_destroy (filter);
bson_destroy (update);
bson_destroy (event);
return r;
}
void
example_func (mongoc_client_t *client)
{
mongoc_client_session_t *cs;
bson_error_t error;
bool r;
ASSERT (client);
cs = mongoc_client_start_session (client, NULL, &error);
if (!cs) {
MONGOC_ERROR ("Could not start session: %s", error.message);
return;
}
r = run_transaction_with_retry (update_employee_info, cs, &error);
if (!r) {
MONGOC_ERROR ("Could not update employee, permanent error: %s", error.message);
}
mongoc_client_session_destroy (cs);
}
using transaction_func = std::function<void(client_session & session)>;
auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
while (true) {
try {
txn_func(session); // performs transaction.
break;
} catch (const operation_exception& oe) {
std::cout << "Transaction aborted. Caught exception during transaction."
<< std::endl;
// If transient error, retry the whole transaction.
if (oe.has_error_label("TransientTransactionError")) {
std::cout << "TransientTransactionError, retrying transaction ..."
<< std::endl;
continue;
} else {
throw oe;
}
}
}
};
auto commit_with_retry = [](client_session& session) {
while (true) {
try {
session.commit_transaction(); // Uses write concern set at transaction start.
std::cout << "Transaction committed." << std::endl;
break;
} catch (const operation_exception& oe) {
// Can retry commit
if (oe.has_error_label("UnknownTransactionCommitResult")) {
std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
<< std::endl;
continue;
} else {
std::cout << "Error during commit ..." << std::endl;
throw oe;
}
}
}
};
// Updates two collections in a transaction
auto update_employee_info = [&](client_session& session) {
auto& client = session.client();
auto employees = client["hr"]["employees"];
auto events = client["reporting"]["events"];
options::transaction txn_opts;
read_concern rc;
rc.acknowledge_level(read_concern::level::k_snapshot);
txn_opts.read_concern(rc);
write_concern wc;
wc.acknowledge_level(write_concern::level::k_majority);
txn_opts.write_concern(wc);
session.start_transaction(txn_opts);
try {
employees.update_one(
make_document(kvp("employee", 3)),
make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
events.insert_one(make_document(
kvp("employee", 3),
kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
} catch (const operation_exception& oe) {
std::cout << "Caught exception during transaction, aborting." << std::endl;
session.abort_transaction();
throw oe;
}
commit_with_retry(session);
};
auto session = client.start_session();
try {
run_transaction_with_retry(update_employee_info, session);
} catch (const operation_exception& oe) {
// Do something with error.
throw oe;
}
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
while (true)
{
try
{
txnFunc(client, session); // performs transaction
break;
}
catch (MongoException exception)
{
// if transient error, retry the whole transaction
if (exception.HasErrorLabel("TransientTransactionError"))
{
Console.WriteLine("TransientTransactionError, retrying transaction.");
continue;
}
else
{
throw;
}
}
}
}
public void CommitWithRetry(IClientSessionHandle session)
{
while (true)
{
try
{
session.CommitTransaction();
Console.WriteLine("Transaction committed.");
break;
}
catch (MongoException exception)
{
// can retry commit
if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
{
Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");
continue;
}
else
{
Console.WriteLine($"Error during commit: {exception.Message}.");
throw;
}
}
}
}
// updates two collections in a transaction
public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
{
var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");
session.StartTransaction(new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority));
try
{
employeesCollection.UpdateOne(
session,
Builders<BsonDocument>.Filter.Eq("employee", 3),
Builders<BsonDocument>.Update.Set("status", "Inactive"));
eventsCollection.InsertOne(
session,
new BsonDocument
{
{ "employee", 3 },
{ "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
});
}
catch (Exception exception)
{
Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
session.AbortTransaction();
throw;
}
CommitWithRetry(session);
}
public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
{
// start a session
using (var session = client.StartSession())
{
try
{
RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
}
catch (Exception exception)
{
// do something with error
Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
}
}
}
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error {
for {
err := txnFn(ctx) // Performs transaction.
if err == nil {
return nil
}
log.Println("Transaction aborted. Caught exception during transaction.")
// If transient error, retry the whole transaction
if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
log.Println("TransientTransactionError, retrying transaction...")
continue
}
return err
}
}
commitWithRetry := func(ctx context.Context) error {
sess := mongo.SessionFromContext(ctx)
for {
err := sess.CommitTransaction(ctx)
switch e := err.(type) {
case nil:
log.Println("Transaction committed.")
return nil
case mongo.CommandError:
// Can retry commit
if e.HasErrorLabel("UnknownTransactionCommitResult") {
log.Println("UnknownTransactionCommitResult, retrying commit operation...")
continue
}
log.Println("Error during commit...")
return e
default:
log.Println("Error during commit...")
return e
}
}
}
// Updates two collections in a transaction.
updateEmployeeInfo := func(ctx context.Context) error {
employees := client.Database("hr").Collection("employees")
events := client.Database("reporting").Collection("events")
sess := mongo.SessionFromContext(ctx)
err := sess.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.Majority()),
)
if err != nil {
return err
}
_, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
_, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
return commitWithRetry(ctx)
}
txnOpts := options.Transaction().SetReadPreference(readpref.Primary())
return client.UseSessionWithOptions(
ctx, options.Session().SetDefaultTransactionOptions(txnOpts),
func(ctx context.Context) error {
return runTransactionWithRetry(ctx, updateEmployeeInfo)
},
)
}

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

void runTransactionWithRetry(Runnable transactional) {
while (true) {
try {
transactional.run();
break;
} catch (MongoException e) {
System.out.println("Transaction aborted. Caught exception during transaction.");
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
System.out.println("TransientTransactionError, aborting transaction and retrying ...");
continue;
} else {
throw e;
}
}
}
}
void commitWithRetry(ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
System.out.println("Transaction committed");
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
System.out.println("Exception during commit ...");
throw e;
}
}
}
}
void updateEmployeeInfo() {
MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = client.startSession()) {
clientSession.startTransaction(txnOptions);
employeesCollection.updateOne(clientSession,
Filters.eq("employee", 3),
Updates.set("status", "Inactive"));
eventsCollection.insertOne(clientSession,
new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));
commitWithRetry(clientSession);
}
}
void updateEmployeeInfoWithRetry() {
runTransactionWithRetry(this::updateEmployeeInfo);
}

注意

对于Motor ,请参阅Callback API

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

async function commitWithRetry(session) {
try {
await session.commitTransaction();
console.log('Transaction committed.');
} catch (error) {
if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
console.log('UnknownTransactionCommitResult, retrying commit operation ...');
await commitWithRetry(session);
} else {
console.log('Error during commit ...');
throw error;
}
}
}
async function runTransactionWithRetry(txnFunc, client, session) {
try {
await txnFunc(client, session);
} catch (error) {
console.log('Transaction aborted. Caught exception during transaction.');
// If transient error, retry the whole transaction
if (error.hasErrorLabel('TransientTransactionError')) {
console.log('TransientTransactionError, retrying transaction ...');
await runTransactionWithRetry(txnFunc, client, session);
} else {
throw error;
}
}
}
async function updateEmployeeInfo(client, session) {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
readPreference: 'primary'
});
const employeesCollection = client.db('hr').collection('employees');
const eventsCollection = client.db('reporting').collection('events');
await employeesCollection.updateOne(
{ employee: 3 },
{ $set: { status: 'Inactive' } },
{ session }
);
await eventsCollection.insertOne(
{
employee: 3,
status: { new: 'Inactive', old: 'Active' }
},
{ session }
);
try {
await commitWithRetry(session);
} catch (error) {
await session.abortTransaction();
throw error;
}
}
return client.withSession(session =>
runTransactionWithRetry(updateEmployeeInfo, client, session)
);

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
while (true) {
try {
$txnFunc($client, $session); // performs transaction
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
// If transient error, retry the whole transaction
if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
continue;
} else {
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
throw $error;
}
}
}
private function commitWithRetry3(\MongoDB\Driver\Session $session): void
{
while (true) {
try {
$session->commitTransaction();
echo "Transaction committed.\n";
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
continue;
} else {
echo "Error during commit ...\n";
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Error during commit ...\n";
throw $error;
}
}
}
private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
$session->startTransaction([
'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'),
'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY),
]);
try {
$client->hr->employees->updateOne(
['employee' => 3],
['$set' => ['status' => 'Inactive']],
['session' => $session],
);
$client->reporting->events->insertOne(
['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']],
['session' => $session],
);
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Caught exception during transaction, aborting.\n";
$session->abortTransaction();
throw $error;
}
$this->commitWithRetry3($session);
}
private function doUpdateEmployeeInfo(\MongoDB\Client $client): void
{
// Start a session.
$session = $client->startSession();
try {
$this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
} catch (\MongoDB\Driver\Exception\Exception) {
// Do something with error
}
}

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

def run_transaction_with_retry(txn_func, session):
while True:
try:
txn_func(session) # performs transaction
break
except (ConnectionFailure, OperationFailure) as exc:
# If transient error, retry the whole transaction
if exc.has_error_label("TransientTransactionError"):
print("TransientTransactionError, retrying transaction ...")
continue
else:
raise
def commit_with_retry(session):
while True:
try:
# Commit uses write concern set at transaction start.
session.commit_transaction()
print("Transaction committed.")
break
except (ConnectionFailure, OperationFailure) as exc:
# Can retry commit
if exc.has_error_label("UnknownTransactionCommitResult"):
print("UnknownTransactionCommitResult, retrying commit operation ...")
continue
else:
print("Error during commit ...")
raise
# Updates two collections in a transactions
def update_employee_info(session):
employees_coll = session.client.hr.employees
events_coll = session.client.reporting.events
with session.start_transaction(
read_concern=ReadConcern("snapshot"),
write_concern=WriteConcern(w="majority"),
read_preference=ReadPreference.PRIMARY,
):
employees_coll.update_one(
{"employee": 3}, {"$set": {"status": "Inactive"}}, session=session
)
events_coll.insert_one(
{"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session
)
commit_with_retry(session)
# Start a session.
with client.start_session() as session:
try:
run_transaction_with_retry(update_employee_info, session)
except Exception:
# Do something with error.
raise

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

def run_transaction_with_retry(session)
begin
yield session # performs transaction
rescue Mongo::Error => e
puts 'Transaction aborted. Caught exception during transaction.'
raise unless e.label?('TransientTransactionError')
puts "TransientTransactionError, retrying transaction ..."
retry
end
end
def commit_with_retry(session)
begin
session.commit_transaction
puts 'Transaction committed.'
rescue Mongo::Error => e
if e.label?('UnknownTransactionCommitResult')
puts "UnknownTransactionCommitResult, retrying commit operation ..."
retry
else
puts 'Error during commit ...'
raise
end
end
end
# updates two collections in a transaction
def update_employee_info(session)
employees_coll = session.client.use(:hr)[:employees]
events_coll = session.client.use(:reporting)[:events]
session.start_transaction(read_concern: { level: :snapshot },
write_concern: { w: :majority },
read: {mode: :primary})
employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
session: session)
events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
session: session)
commit_with_retry(session)
end
session = client.start_session
begin
run_transaction_with_retry(session) do
update_employee_info(session)
end
rescue StandardError => e
# Do something with error
raise
end

重要

要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

无论是何种数据库系统,无论是 MongoDB 还是关系型数据库,应用程序都应采取相应措施来处理事务提交期间的错误,并包含事务的重试逻辑。

事务中的单独写入操作不可重试,无论 retryWrites 的值如何。如果操作遇到与标签相关的 "TransientTransactionError" 错误(例如主节点降级时),可以将事务作为一个整体进行重试。

  • 回调 API 包含 "TransientTransactionError" 的重试逻辑。

  • 核心事务 API 不包含 "TransientTransactionError" 的重试逻辑。要处理 "TransientTransactionError",应用程序应显式包含错误的重试逻辑。要查看包含瞬时错误重试逻辑的示例,请参阅 Core API 示例

提交操作是可重试写入操作。如果提交操作遇到错误,那么无论 retryWrites 的值如何,MongoDB 驱动程序都会重试提交。

如果提交操作遇到标记为 "UnknownTransactionCommitResult" 的错误,则可以重试提交。

  • 回调 API 包含 "UnknownTransactionCommitResult" 的重试逻辑。

  • 核心事务 API 不包含 "UnknownTransactionCommitResult" 的重试逻辑。要处理 "UnknownTransactionCommitResult",应用程序应显式包含错误的重试逻辑。要查看包含未知提交错误的重试逻辑示例,请参阅 Core API 示例。

6.2 版本新增

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。此错误意味着缓存过小,重试可能会失败。

transactionTooLargeForCacheThreshold 阈值的默认值为 0.75。当事务使用超过 75% 的缓存时,服务器会返回 TransactionTooLargeForCache 而不是重试事务。

在 MongoDB 的早期版本中,服务器会返回 TemporarilyUnavailableWriteConflict,而不是 TransactionTooLargeForCache

使用 setParameter 命令修改错误阈值。

以下 mongosh 方法适用于事务:

注意

为简单起见,mongosh 示例省略了重试逻辑和强大的错误处理功能。有关在应用程序中合并事务的更实际示例,请改为参阅事务错误处理

// Create collections:
db.getSiblingDB("mydb1").foo.insertOne(
{abc: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
db.getSiblingDB("mydb2").bar.insertOne(
{xyz: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );
coll1 = session.getDatabase("mydb1").foo;
coll2 = session.getDatabase("mydb2").bar;
// Start a transaction
session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } );
// Operations inside the transaction
try {
coll1.insertOne( { abc: 1 } );
coll2.insertOne( { xyz: 999 } );
} catch (error) {
// Abort transaction on error
session.abortTransaction();
throw error;
}
// Commit the transaction using write concern set at transaction start
session.commitTransaction();
session.endSession();

后退

事务