Docs 菜单
Docs 主页
/
MongoDB Manual

事务

在此页面上

  • 事务 API
  • 事务和原子性
  • 事务和操作
  • 事务和会话
  • 读关注/写关注/读取偏好
  • 基本信息
  • 了解详情

在 MongoDB 中,对单个文档的操作具有原子性。由于您可以使用嵌入式文档和数组来捕获单个文档结构中数据之间的关系,而无需跨多个文档和集合进行标准化,因此这种单文档原子性消除了许多实际使用案例使用分布式事务的必要性。

对于需要对多个文档(在单个或多个集合中)的读写操作具有原子性的情况,MongoDB 支持多文档事务。利用分布式事务,可以跨多个操作、集合、数据库、文档和分片使用事务。

此页面上的信息适用于在以下环境中托管的部署:

  • MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务

  • MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本

  • MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本


➤ 使用右上角的选择语言下拉菜单来设置以下示例的语言。


此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(例如 DuplicateKeyError)可能会结束 ACID 事务并导致命令错误,以警报用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在事务上使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
// Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be
// needed. The suggested Atlas connection string includes majority write concern by default.
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances.
For example:
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations.
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:

  • 启动事务

  • 执行指定操作

  • 提交结果或出现错误时结束事务

服务器端操作中的错误(如 DuplicateKeyError)可能会结束事务并导致命令错误,以提醒用户事务已结束。即使客户端从未调用 Session.abortTransaction(),也会发生此行为,这是预料之中的。要合并自定义错误处理,请在您的事务中使用 Core API

Callback API回调API包含某些错误的重试逻辑。在发生 TransientTransactionError UnknownTransactionCommitResult提交错误后,驾驶员会尝试重新运行ACID 事务。

从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?
// replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g.
// let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
let client = Client::with_uri_str(uri).await?;
// Prereq: Create collections. CRUD operations in transactions must be on existing collections.
client
.database("mydb1")
.collection::<Document>("foo")
.insert_one(doc! { "abc": 0})
.await?;
client
.database("mydb2")
.collection::<Document>("bar")
.insert_one(doc! { "xyz": 0})
.await?;
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transaction.
async fn callback(session: &mut ClientSession) -> Result<()> {
let collection_one = session
.client()
.database("mydb1")
.collection::<Document>("foo");
let collection_two = session
.client()
.database("mydb2")
.collection::<Document>("bar");
// Important: You must pass the session to the operations.
collection_one
.insert_one(doc! { "abc": 1 })
.session(&mut *session)
.await?;
collection_two
.insert_one(doc! { "xyz": 999 })
.session(session)
.await?;
Ok(())
}
// Step 2: Start a client session.
let mut session = client.start_session().await?;
// Step 3: Use and_run to start a transaction, execute the callback, and commit (or
// abort on error).
session
.start_transaction()
.and_run((), |session, _| callback(session).boxed())
.await?;

此示例使用 Core API。由于 Core API 并不包含针对 TransientTransactionErrorUnknownTransactionCommitResult 提交错误的重试逻辑,因此示例中包含了针对这些错误重试事务的显式逻辑:

重要

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

提示

另请参阅:

有关 mongosh 中的例子,请参见mongosh 示例。

对于需要对多个文档(在单个或多个集合中)原子性读取和写入的情况,MongoDB 支持分布式事务,包括副本集和分片集群上的事务。

分布式事务具有原子性:

  • 事务要么应用所有数据更改,要么回滚更改。

  • 在事务提交时,事务中所做的所有数据更改都会保存,并且在事务之外可见。

    在事务进行提交前,在事务中所做的数据更改在事务外不可见。

    不过,当事务写入多个分片时,并非所有外部读取操作都需等待已提交事务的结果在各个分片上可见。例如,如果事务已提交并且写入 1 在分片 A 上可见,但写入 2 在分片 B 上尚不可见,则读关注 "local" 处的外部读取可以在不看到写入 2 的情况下读取写入 1 的结果。

  • 事务中止后,在事务中所做的所有数据更改会被丢弃且不会变得可见。例如,如果事务中的任何操作失败,事务就会中止,事务中所做的所有数据更改将被丢弃且不会变得可见。

重要

在大多数情况下,与单文档写入操作相比,分布式事务会产生更高的性能成本,并且分布式事务的可用性不应取代有效的模式设计。在许多情况下,非规范化数据模型(嵌入式文档和数组)仍然是数据和使用案例的最佳选择。换言之,对于许多场景,适当的数据建模将最大限度地减少对分布式事务的需求。

有关其他事务使用注意事项(如运行时间限制和 oplog 大小限制),另请参阅生产注意事项

提示

另请参阅:

可以跨多个操作、集合、数据库、文档和分片使用分布式事务。

对于事务:

  • 可以在事务中创建集合和索引。有关详细信息,请参阅在事务中创建集合和索引

  • 事务中使用的集合可以位于不同的数据库中。

    注意

    您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。

  • 不能写入固定大小集合。

  • 固定大小集合读取时不能使用读关注 "snapshot"。(从 MongoDB 5.0 开始)

  • 不能在 configadminlocal 数据库中读取/写入集合。

  • 不能写入 system.* 集合。

  • 不能使用 explain 或类似命令返回受支持操作的查询计划。

  • 对于在 ACID 事务外部创建的游标,无法在 ACID 事务内部调用 getMore

  • 对于在事务中创建的游标,无法在事务外部调用 getMore

  • 您不能将 killCursors 命令指定为事务中的第一个操作。

    此外,如果在ACID 事务中运行killCursors 命令,服务器会立即停止指定的游标。它不会等待ACID 事务提交。

有关事务中不支持的操作列表,请参阅限制操作。

提示

在启动事务之前创建或删除集合时,如果在事务内部访问该集合,请发出带有写关注 "majority" 的创建或删除操作,以确保事务可以获取所需的锁。

提示

另请参阅:

如果事务不是跨分片写入事务,则可以在分布式事务中执行以下操作:

  • 创建集合。

  • 在先前同一事务中创建的新空集合上创建索引。

在事务中创建集合时:

在事务内创建索引 [1] 时,要创建的索引必须位于以下位置之一:

  • 不存在的集合。集合作为操作的一部分创建。

  • 先前在同一事务中创建的新空集合。

[1] 您还可以对现有索引运行 db.collection.createIndex()db.collection.createIndexes() 以检查其是否存在。这些操作成功返回而不创建索引。
  • 您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。

  • 当以分片集合为目标时,您无法在事务中使用 $graphLookup 阶段。

  • 要在事务内显式创建集合或索引,事务读关注级别必须为 "local"

    要显式创建集合和索引,请使用以下命令和方法:

    命令
    方法

提示

另请参阅:

要在事务内执行计数操作,请使用 $count 聚合阶段或 $group(带有 $sum 表达式)聚合阶段。

MongoDB 驱动程序提供集合级 API countDocuments(filter, options) 作为辅助方法,该方法使用 $group$sum 表达式来执行计数。

mongosh 提供 db.collection.countDocuments() 辅助方法,该方法使用 $group$sum 表达式进行计数。

如要在事务中执行不同的操作:

  • 对于未分片的集合,可以使用 db.collection.distinct() 方法/distinct 命令以及带有 $group 阶段的聚合管道。

  • 对于分片集合,不能使用 db.collection.distinct() 方法或 distinct 命令。

    要查找分片集合的不同值,请改用带有 $group 阶段的 aggregation pipeline。例如:

    • 不使用 db.coll.distinct("x"),而是使用

      db.coll.aggregate([
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])
    • 不使用 db.coll.distinct("x", { status: "A" }),而是使用

      db.coll.aggregate([
      { $match: { status: "A" } },
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])

    管道返回一个指向文档的游标:

    { "distinctValues" : [ 2, 3, 1 ] }

    迭代游标以访问结果文档。

事务中允许使用诸如 hellobuildInfoconnectionStatus(及其辅助方法)之类的信息命令,但它们不能是事务中的第一项操作。

事务中不允许执行以下操作:

  • 事务与会话关联。

  • 一个会话一次最多可以具有一个未结事务。

  • 使用驱动程序时,事务中的每项操作都必须与会话关联。有关详细信息,请参阅驱动程序特定文档。

  • 如果会话结束并且具有打开的事务,则事务将中止。

事务中的操作使用事务级读取偏好

使用驱动程序,您可以在事务启动时设置事务级读取偏好

  • 如果未设置事务级别的读取偏好,则事务将使用会话级别的读取偏好。

  • 如果未设置事务级别和会话级别的读取偏好,则事务将使用客户端级别的读取偏好。默认情况下,客户端级别的读取偏好为 primary

包含读取操作的分布式事务必须使用读取偏好 primary。给定事务中的所有操作必须路由到同一节点。

事务中的操作使用事务级读关注。也就是说,在集合和数据库级别设置的任何读关注在事务中都会被忽略。

您可以在事务启动时设置事务级别的读关注

  • 如果未设置事务级别的读关注,则事务级别的读关注默认为会话级别的读关注。

  • 如果未设置事务级读关注和会话级读关注,则事务级读关注默认为客户端级读关注。默认情况下,对于主节点上的读取,客户端级读关注是 "local"。另请参阅:

事务支持以下读关注级别:

  • 读关注 "local" 返回节点中可用的最新数据,但可以回滚。

  • 在副本集上,即使ACID 事务使用读关注(read concern)local ,您也可能会观察到更强的读隔离性性,因为该操作会从ACID 事务打开点的快照中读取。

  • 对于分片集群上的事务,读关注 "local" 无法保证数据来自跨分片的同一快照视图。如果需要快照隔离,请使用读关注 "snapshot"

  • 您可以在事务中创建集合和索引。如要显式创建集合或索引,则事务必须使用读关注 "local"。如果隐式创建集合,则可以使用任何可用于事务的读关注。

  • 如果事务以写关注“majority”提交,则读关注 "majority" 返回已被多数副本集节点确认且无法回滚的数据。否则,读关注 "majority" 不保证读取操作读取多数提交的数据。

  • 对于分片集群上的事务,读关注 "majority" 无法保证数据来自跨分片的同一快照视图。如果需要快照隔离,请使用读关注 "snapshot"

事务使用事务级写关注来提交写入操作。事务内的写入操作必须在没有明确写关注规范的情况下执行,并须使用默认的写关注。在提交时,使用事务级写关注来提交写入。

提示

请勿为事务中的各个写入操作显式设置写关注。为事务内的各个写入操作设置写关注会返回错误消息。

您可以在事务启动时设置事务级写关注

  • 如果未设置事务级别的写关注,则事务级别的写关注默认为提交的会话级别写关注。

  • 如果未设置事务级别的写关注和会话级别的的写关注,则事务级别的写关注默认为 的客户端级别的写关注,

事务支持所有写关注 w 值,包括:

  • 写关注 w: 1 会在提交应用于主节点后返回确认信息。

    重要

  • 使用w: 1写入关注提交时,事务级 "majority" 读关注无法保证事务中的读操作会读取大多数已提交数据。

  • 使用 w: 1 写关注提交时,事务级 "snapshot" 读关注无法保证事务中的读操作会使用大多数已提交数据的快照。

  • 在将提交应用于大多数投票节点后,写关注 w: "majority" 会返回确认消息。

  • 使用 w: "majority" 写关注提交时,事务级 "majority" 读关注可以保证操作已读取大多数已提交数据。对于分片集群上的事务,大多数已提交数据的视图不会在各分片之间同步。

  • 使用 w: "majority" 写关注提交时,事务级 "snapshot" 读关注可以保证操作已从大多数已提交数据的同步快照中读取。

注意

无论为事务指定了何种写关注,分片集群事务的提交操作始终包括一些使用 {w: "majority", j: true} 写关注的部分。

服务器参数 coordinateCommitReturnImmediatelyAfterPersistingDecision 可控制何时将事务提交决策返回给客户端。

该参数是在 MongDB 5.0 中引入的,默认值为 true。。在 MongoDB 6.1 中,该默认值更改为 false

coordinateCommitReturnImmediatelyAfterPersistingDecisionfalse 时,分片事务协调器会等待所有成员确认多文档事务提交,然后再将提交决策返回给客户端。

如果为多文档事务指定 "majority" 写关注,并且该事务未能复制到计算出的多数副本集成员,则该事务可能不会立即在副本集成员上回滚。副本集最终将保持一致。事务始终在所有副本集节点上应用或回滚。

无论为事务指定了何种写关注 ,驱动程序在重试 commitTransaction 时都会应用 w: "majority" 作为写关注。

以下各节描述了事务的更多注意事项。

有关生产环境中的事务,请参阅生产环境注意事项。此外,有关分片集群,请参阅生产环境注意事项(分片集群)

如果副本集具有仲裁节点节点,则无法使用ACID 事务更改分分片键。 仲裁节点无法参与多分片事务所需的数据操作。

如果任何事务操作读取或写入包含仲裁节点的分片,则写入操作跨越多个分片的事务将出现错误并中止。

您无法在具有将 writeConcernMajorityJournalDefault 设置为 false 的分片(例如具有使用内存中存储引擎的投票节点的分片)的分片集群上运行事务。

注意

无论为事务指定了何种写关注,分片集群事务的提交操作始终包括一些使用 {w: "majority", j: true} 写关注的部分。

要获取事务状态和指标,请使用以下方法:

返回:

返回事务指标。

MongoDB Atlas M 0 /M 2 /M 5集群上不会返回某些serverStatus响应字段。 有关更多信息,请参阅MongoDB Atlas文档中的有限命令

$currentOp 聚合管道

返回:

返回:

mongodmongos 日志消息
TXN 日志组件中包含有关慢速事务(即超过 operationProfiling.slowOpThresholdMs 阈值的事务)的信息。

要使用事务,所有部署节点的 featureCompatibilityVersion 必须至少为:

部署
最低 featureCompatibilityVersion
副本集(Replica Set)
4.0
分片集群
4.2

要检查成员的 FCV,请连接到该成员并运行以下命令:

db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )

更多信息,请参阅 setFeatureCompatibilityVersion 参考页。

副本集和分片集群支持的分布式事务,其中:

  • 主节点使用 WiredTiger 存储引擎,而

  • 从节点使用 WiredTiger 存储引擎或内存存储引擎。

注意

您无法在具有将 writeConcernMajorityJournalDefault 设置为 false 的分片(例如具有使用内存中存储引擎的投票成员的分片)的分片集群上运行事务。

从 MongoDB 5.2(和 5.0.4)开始:

后退

update