事务
Overview
在本指南中,您可以学习;了解如何使用Java Reactive Streams驾驶员来执行事务。 事务允许您运行一系列操作,这些操作在所有数据更改都成功之前才应用。 如果ACID 事务中的任何操作失败,驾驶员将取消ACID 事务并丢弃所有数据更改,而不会变得可见。
在MongoDB中,事务在逻辑会话中运行。 会话是您打算按顺序运行的一组相关读取或写入操作。 通过会话,您可以为一群组操作启用因果一致性,并运行ACID事务。 MongoDBACID 一致性保证ACID 事务操作中涉及的数据保持一致,即使操作遇到意外错误。
使用Java Reactive Streams驾驶员时,您可以从 MongoClient
实例创建一个新会话,并将其类型定义为ClientSession
。 我们建议您将客户端重复用于多个会话和事务,而不是每次都实例化一个新客户端。
警告
仅应将 ClientSession
与创建它的 MongoClient
(或关联的 MongoDatabase
或 MongoCollection
)一起使用。将 ClientSession
与其他 MongoClient
一起使用会导致操作错误。
样本数据
本指南中的示例使用Atlas示例数据集中的sample_restaurants.restaurants
和sample_mflix.movies
集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门。
重要
项目 Reactor 库
本指南使用 Project Reactor 库来使用Java Reactive Streams驾驶员方法返回的Publisher
实例。 要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 入门 在 Reactor 文档中。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB ”指南。
事务方法
在MongoClient
实例上使用startSession()
方法创建ClientSession
。 然后,您可以使用ClientSession
提供的方法修改会话状态。 下表详细介绍了可用于管理ACID 事务的方法:
方法 | 说明 |
---|---|
startTransaction() | Starts a new transaction, configured with the given options, on
this session. Throws an exception if there is already
a transaction in progress for the session. To learn more about
this method, see the startTransaction() page in the MongoDB Server manual. |
abortTransaction() | Ends the active transaction for this session. Throws an exception
if there is no active transaction for the session or if the
transaction is already committed or ended. To learn more about
this method, see the abortTransaction() page in the MongoDB Server manual. |
commitTransaction() | Commits the active transaction for this session. Throws an exception
if there is no active transaction for the session or if the
transaction was ended. To learn more about
this method, see the commitTransaction() page in the MongoDB Server manual. |
事务示例
以下示例演示了如何创建会话、创建ACID 事务以及在一个ACID 事务中将文档插入到多个集合中。 该代码执行以下步骤:
使用
startSession()
方法从客户端创建会话使用
startTransaction()
方法启动ACID 事务将文档插入
restaurants
和movies
集合使用
commitTransaction()
方法提交ACID 事务
MongoClient mongoClient = MongoClients.create(settings); MongoDatabase restaurantsDatabase = mongoClient.getDatabase("sample_restaurants"); MongoCollection<Document> restaurants = restaurantsDatabase.getCollection("restaurants"); MongoDatabase moviesDatabase = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> movies = moviesDatabase.getCollection("movies"); Mono.from(mongoClient.startSession()) .flatMap(session -> { // Begins the transaction session.startTransaction(); // Inserts documents in the given order return Mono.from(restaurants.insertOne(session, new Document("name", "Reactive Streams Pizza").append("cuisine", "Pizza"))) .then(Mono.from(movies.insertOne(session, new Document("title", "Java: Into the Streams").append("type", "Movie")))) // Commits the transaction .flatMap(result -> Mono.from(session.commitTransaction()) .thenReturn(result)) .onErrorResume(error -> Mono.from(session.abortTransaction()).then(Mono.error(error))) .doFinally(signalType -> session.close()); }) // Closes the client after the transaction completes .doFinally(signalType -> mongoClient.close()) // Prints the results of the transaction .subscribe( result -> System.out.println("Transaction succeeded"), error -> System.err.println("Transaction failed: " + error) );
更多信息
要了解有关本指南中提到的概念的更多信息,请参阅服务器手册中的以下页面:
API 文档
要进一步了解本指南所讨论的任何类型或方法,请参阅以下 API 文档: