Docs 菜单
Docs 主页
/ / /
Scala
/

执行事务

在此页面上

  • Overview
  • 方法
  • 事务示例
  • 更多信息
  • API 文档

在本指南中,您可以学习;了解如何使用Scala驾驶员来执行事务。事务允许您执行一系列仅在提交整个ACID 事务后才更改数据的操作。如果ACID 事务中的任何操作不成功,驾驶员会停止ACID 事务并在所有数据更改变得可见之前将其丢弃。此功能称为原子性。

在MongoDB中,事务在逻辑会话中运行。会话是要按顺序运行的一组相关读取或写入操作。会话可为一群组操作启用因果一致性,并允许您在符合ACID的ACID 事务中运行操作,该ACID 事务满足原子性、一致性、隔离性和持久性的预期。 MongoDBACID 一致性保证ACID 事务操作中涉及的数据保持一致,即使操作遇到意外错误。

使用Scala驾驶员时,可以通过在客户端上调用 startSession() 方法来启动 ClientSession。然后,您可以在会话中执行事务。

警告

仅在创建ClientSessionMongoClient上运行的操作中使用 。 将ClientSession与不同的MongoClient一起使用会导致操作错误。

调用 startSession() 方法启动会话后,您可以使用 ClientSession 类中的方法修改会话状态。下表描述了可用于管理ACID 事务的方法:

方法
说明

startTransaction()

Starts a new transaction on this session. You cannot start a transaction if there's already an active transaction running in the session.

You can set transaction options by passing a TransactionOptions instance as a parameter.

commitTransaction()

Commits the active transaction for this session. This method returns an error if there is no active transaction for the session, the transaction was previously ended, or if there is a write conflict.

abortTransaction()

Ends the active transaction for this session. This method returns an error if there is no active transaction for the session or if the transaction was committed or ended.

此示例定义了一个 runTransaction() 方法,用于修改 sample_mflix数据库集合中的数据。该代码执行以下操作:

  • 创建 MongoCollection 实例以访问权限moviesusers 集合

  • 指定ACID 事务的读关注和写关注(write concern)

  • 启动ACID 事务

  • 将文档插入 movies集合并打印结果

  • 更新 users集合中的文档并打印结果

def runTransaction(
database: MongoDatabase,
observable: SingleObservable[ClientSession]
): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val moviesCollection = database.getCollection("movies")
val usersCollection = database.getCollection("users")
val transactionOptions = TransactionOptions
.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
// Starts the transaction with specified options
clientSession.startTransaction(transactionOptions)
// Inserts a document into the "movies" collection
val insertObservable = moviesCollection.insertOne(
clientSession,
Document("name" -> "The Menu", "runtime" -> 107)
)
val insertResult = Await.result(insertObservable.toFuture(), Duration(10, TimeUnit.SECONDS))
println(s"Insert completed: $insertResult")
// Updates a document in the "users" collection
val updateObservable = usersCollection.updateOne(
clientSession,
equal("name", "Amy Phillips"), set("name", "Amy Ryan")
)
val updateResult = Await.result(updateObservable.toFuture(), Duration(10, TimeUnit.SECONDS))
println(s"Update completed: $updateResult")
clientSession
})
}

注意

在ACID 事务中,操作必须按顺序运行。前面的代码等待每个写入操作的结果,以确保这些操作不会同时运行。

然后,运行以下代码以执行ACID 事务。 此代码完成以下操作:

  • 使用startSession()方法从客户端创建会话

  • 调用上一示例中定义的 runTransaction() 方法,将数据库和会话作为参数传递

  • 通过调用 commitTransaction() 方法提交ACID 事务并等待操作完成

val client = MongoClient("<connection string>")
val database = client.getDatabase("sample_mflix")
val session = client.startSession();
val transactionObservable: SingleObservable[ClientSession] =
runTransaction(database, session)
val commitTransactionObservable: SingleObservable[Unit] =
transactionObservable.flatMap(clientSession => clientSession.commitTransaction())
Await.result(commitTransactionObservable.toFuture(), Duration(10, TimeUnit.SECONDS))
Insert completed: AcknowledgedInsertOneResult{insertedId=BsonObjectId{value=...}}
Update completed: AcknowledgedUpdateResult{matchedCount=1, modifiedCount=1, upsertedId=null}

注意

不支持并行操作

Scala驾驶员不支持在单个ACID 事务中运行并行操作。

如果您使用的是MongoDB Server v8.0 或更高版本,则可以使用 方法在单个ACIDbulkWrite() 事务中对多个命名空间执行写入操作。有关更多信息,请参阅 批量写入操作指南。

要学习;了解有关本指南中提到的概念的更多信息,请参阅MongoDB Server手册中的以下页面:

  • 事务

  • 服务器会话

  • 读取隔离性、一致性和新近度

要学习;了解有关ACID compliance的更多信息,请参阅MongoDB网站上的《数据库管理系统中ACID属性指南》一文。

要了解有关插入操作的更多信息,请参阅插入文档指南。

要进一步了解本指南所提及的方法和类型,请参阅以下 API 文档:

后退

批量写入操作