执行事务
Overview
在本指南中,您可以学习;了解如何使用Scala驾驶员来执行事务。事务允许您执行一系列仅在提交整个ACID 事务后才更改数据的操作。如果ACID 事务中的任何操作不成功,驾驶员会停止ACID 事务并在所有数据更改变得可见之前将其丢弃。此功能称为原子性。
在MongoDB中,事务在逻辑会话中运行。会话是要按顺序运行的一组相关读取或写入操作。会话可为一群组操作启用因果一致性,并允许您在符合ACID的ACID 事务中运行操作,该ACID 事务满足原子性、一致性、隔离性和持久性的预期。 MongoDBACID 一致性保证ACID 事务操作中涉及的数据保持一致,即使操作遇到意外错误。
使用Scala驾驶员时,可以通过在客户端上调用 startSession()
方法来启动 ClientSession
。然后,您可以在会话中执行事务。
警告
仅在创建ClientSession
的MongoClient
上运行的操作中使用 。 将ClientSession
与不同的MongoClient
一起使用会导致操作错误。
方法
调用 startSession()
方法启动会话后,您可以使用 ClientSession
类中的方法修改会话状态。下表描述了可用于管理ACID 事务的方法:
方法 | 说明 |
---|---|
| Starts a new transaction on this session. You cannot start a
transaction if there's already an active transaction running in
the session. You can set transaction options by passing a TransactionOptions
instance as a parameter. |
| Commits the active transaction for this session. This method returns an
error if there is no active transaction for the session, the
transaction was previously ended, or if there is a write conflict. |
| Ends the active transaction for this session. This method returns an
error if there is no active transaction for the session or if the
transaction was committed or ended. |
事务示例
此示例定义了一个 runTransaction()
方法,用于修改 sample_mflix
数据库集合中的数据。该代码执行以下操作:
创建
MongoCollection
实例以访问权限movies
和users
集合指定ACID 事务的读关注和写关注(write concern)
启动ACID 事务
将文档插入
movies
集合并打印结果更新
users
集合中的文档并打印结果
def runTransaction( database: MongoDatabase, observable: SingleObservable[ClientSession] ): SingleObservable[ClientSession] = { observable.map(clientSession => { val moviesCollection = database.getCollection("movies") val usersCollection = database.getCollection("users") val transactionOptions = TransactionOptions .builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() // Starts the transaction with specified options clientSession.startTransaction(transactionOptions) // Inserts a document into the "movies" collection val insertObservable = moviesCollection.insertOne( clientSession, Document("name" -> "The Menu", "runtime" -> 107) ) val insertResult = Await.result(insertObservable.toFuture(), Duration(10, TimeUnit.SECONDS)) println(s"Insert completed: $insertResult") // Updates a document in the "users" collection val updateObservable = usersCollection.updateOne( clientSession, equal("name", "Amy Phillips"), set("name", "Amy Ryan") ) val updateResult = Await.result(updateObservable.toFuture(), Duration(10, TimeUnit.SECONDS)) println(s"Update completed: $updateResult") clientSession }) }
注意
在ACID 事务中,操作必须按顺序运行。前面的代码等待每个写入操作的结果,以确保这些操作不会同时运行。
然后,运行以下代码以执行ACID 事务。 此代码完成以下操作:
使用
startSession()
方法从客户端创建会话调用上一示例中定义的
runTransaction()
方法,将数据库和会话作为参数传递通过调用
commitTransaction()
方法提交ACID 事务并等待操作完成
val client = MongoClient("<connection string>") val database = client.getDatabase("sample_mflix") val session = client.startSession(); val transactionObservable: SingleObservable[ClientSession] = runTransaction(database, session) val commitTransactionObservable: SingleObservable[Unit] = transactionObservable.flatMap(clientSession => clientSession.commitTransaction()) Await.result(commitTransactionObservable.toFuture(), Duration(10, TimeUnit.SECONDS))
Insert completed: AcknowledgedInsertOneResult{insertedId=BsonObjectId{value=...}} Update completed: AcknowledgedUpdateResult{matchedCount=1, modifiedCount=1, upsertedId=null}
更多信息
要学习;了解有关本指南中提到的概念的更多信息,请参阅MongoDB Server手册中的以下页面:
要学习;了解有关ACID compliance的更多信息,请参阅MongoDB网站上的《数据库管理系统中ACID属性指南》一文。
要了解有关插入操作的更多信息,请参阅插入文档指南。
API 文档
要进一步了解本指南所提及的方法和类型,请参阅以下 API 文档: