Session.startTransaction()
定义
Session.startTransaction(<options>)
启动与会话关联的多文档事务。 在任何给定时间,一个会话最多可以有一个未结ACID 事务。
在 version 4.2 中进行了更改:从 MongoDB 4.2 开始,多文档事务可用于分片集群和副本集。
重要
在事务中,您只能指定对现有 集合的读取和写入 (CRUD) 操作。例如,多文档事务不能包含会导致创建新集合的插入操作。
Session.startTransaction()
方法可以使用以下选项获取文档:{ readConcern: { level: <level>}, writeConcern: { w: <value>, j: <boolean>, wtimeout: <number> } } 选项说明readConcern
可选。用于为事务中所有操作指定读关注并且覆盖特定于操作的读关注的文档。
您可以指定以下读关注级别之一:
对于
"local"
和"majority"
的读关注,MongoDB 有时可能会用更强烈的读关注代替。writeConcern
兼容性
此方法可用于以下环境中托管的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
行为
事务中支持的操作
对于多文档事务:
可以在事务中创建集合和索引。有关详细信息,请参阅在事务中创建集合和索引
事务中使用的集合可以位于不同的数据库中。
注意
您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。
不能写入固定大小集合。
从固定大小集合读取时不能使用读关注
"snapshot"
。(从 MongoDB 5.0 开始)不能在
config
、admin
或local
数据库中读取/写入集合。不能写入
system.*
集合。不能使用
explain
或类似命令返回受支持操作的查询计划。
不能将
killCursors
命令指定为ACID 事务中的第一个操作。此外,如果在ACID 事务中运行
killCursors
命令,服务器会立即停止指定的游标。它不会等待ACID 事务提交。
方法 | 命令 | 注意 |
---|---|---|
可用于未分片的集合。 For sharded collections, use the aggregation pipeline with the
$group stage. See Distinct Operation. | ||
如果使用 | ||
如果在不存在的集合上运行,则会隐式创建该集合。 | ||
如果在不存在的集合上运行,则会隐式创建该集合。 | ||
如果在不存在的集合上运行,则会隐式创建该集合。 |
多文档事务中不允许影响数据库目录的操作,例如创建或删除集合或索引。例如,多文档事务不能包含会导致创建新集合的插入操作。请参阅受限操作。
事务中允许使用诸如 hello
、buildInfo
、connectionStatus
(及其辅助方法)之类的信息命令,但它们不能是事务中的第一项操作。
读取偏好
原子性(Atomicity)
当事务处于打开状态时,在事务之外无法查看事务中操作所执行的任何数据更改:
在事务提交时,事务中所做的所有数据更改都会保存,并且在事务之外可见。换言之,一个事务不会在回滚其他事务的同时提交某些更改。
在事务进行提交前,在事务中所做的数据更改在事务外不可见。
不过,当事务写入多个分片时,并非所有外部读取操作都需等待已提交事务的结果在各个分片上可见。例如,如果事务已提交并且写入 1 在分片 A 上可见,但写入 2 在分片 B 上尚不可见,则读关注
"local"
处的外部读取可以在不看到写入 2 的情况下读取写入 1 的结果。当事务中止时,事务中通过写入进行的所有数据更改都会丢弃且变得不可见,同时事务结束。
例子
考虑这样一种情况:当对hr
数据库中的员工记录进行更改时,您希望确保reporting
数据库中的events
集合与hr
更改同步。换言之,您希望确保这些写入作为单个事务完成,以便两个操作要么成功,要么失败。
hr
数据库中的 employees
集合包含以下文档:
{ "_id" : ObjectId("5af0776263426f87dd69319a"), "employee" : 3, "name" : { "title" : "Mr.", "name" : "Iba Ochs" }, "status" : "Active", "department" : "ABC" } { "_id" : ObjectId("5af0776263426f87dd693198"), "employee" : 1, "name" : { "title" : "Miss", "name" : "Ann Thrope" }, "status" : "Active", "department" : "ABC" } { "_id" : ObjectId("5af0776263426f87dd693199"), "employee" : 2, "name" : { "title" : "Mrs.", "name" : "Eppie Delta" }, "status" : "Active", "department" : "XYZ" }
reporting
数据库中的 events
集合包含以下文档:
{ "_id" : ObjectId("5af07daa051d92f02462644a"), "employee" : 1, "status" : { "new" : "Active", "old" : null }, "department" : { "new" : "ABC", "old" : null } } { "_id" : ObjectId("5af07daa051d92f02462644b"), "employee" : 2, "status" : { "new" : "Active", "old" : null }, "department" : { "new" : "XYZ", "old" : null } } { "_id" : ObjectId("5af07daa051d92f02462644c"), "employee" : 3, "status" : { "new" : "Active", "old" : null }, "department" : { "new" : "ABC", "old" : null } }
以下示例将打开一个事务,在 employees
状态中将员工的状态更新为 Inactive
,将相应文档插入到 events
集合,并将这两个操作作为单个事务提交。
// Runs the txnFunc and retries if TransientTransactionError encountered function runTransactionWithRetry(txnFunc, session) { while (true) { try { txnFunc(session); // performs transaction break; } catch (error) { // If transient error, retry the whole transaction if (error?.errorLabels?.includes("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ..."); continue; } else { throw error; } } } } // Retries commit if UnknownTransactionCommitResult encountered function commitWithRetry(session) { while (true) { try { session.commitTransaction(); // Uses write concern set at transaction start. print("Transaction committed."); break; } catch (error) { // Can retry commit if (error?.errorLabels?.includes("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ..."); continue; } else { print("Error during commit ..."); throw error; } } } } // Updates two collections in a transactions function updateEmployeeInfo(session) { employeesCollection = session.getDatabase("hr").employees; eventsCollection = session.getDatabase("reporting").events; session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } ); try{ employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } ); eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } ); } catch (error) { print("Caught exception during transaction, aborting."); session.abortTransaction(); throw error; } commitWithRetry(session); } // Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } ); try{ runTransactionWithRetry(updateEmployeeInfo, session); } catch (error) { // Do something with error } finally { session.endSession(); }