Docs 菜单
Docs 主页
/ / /
Java Reactive Streams 驱动程序
/

事务

在此页面上

  • Overview
  • 样本数据
  • 事务方法
  • 事务示例
  • 更多信息
  • API 文档

在本指南中,您可以学习;了解如何使用Java Reactive Streams驾驶员来执行事务。 事务允许您运行一系列操作,这些操作在所有数据更改都成功之前才应用。 如果ACID 事务中的任何操作失败,驾驶员将取消ACID 事务并丢弃所有数据更改,而不会变得可见。

在MongoDB中,事务在逻辑会话中运行。 会话是您打算按顺序运行的一组相关读取或写入操作。 通过会话,您可以为一群组操作启用因果一致性,并运行ACID事务。 MongoDBACID 一致性保证ACID 事务操作中涉及的数据保持一致,即使操作遇到意外错误。

使用Java Reactive Streams驾驶员时,您可以从 MongoClient实例创建一个新会话,并将其类型定义为ClientSession 。 我们建议您将客户端重复用于多个会话和事务,而不是每次都实例化一个新客户端。

警告

仅应将 ClientSession 与创建它的 MongoClient(或关联的 MongoDatabaseMongoCollection)一起使用。将 ClientSession 与其他 MongoClient 一起使用会导致操作错误。

本指南中的示例使用Atlas示例数据集中sample_restaurants.restaurantssample_mflix.movies集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门。

重要

项目 Reactor 库

本指南使用 Project Reactor 库来使用Java Reactive Streams驾驶员方法返回的Publisher实例。 要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 入门 在 Reactor 文档中。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB ”指南。

MongoClient实例上使用startSession()方法创建ClientSession 。 然后,您可以使用ClientSession提供的方法修改会话状态。 下表详细介绍了可用于管理ACID 事务的方法:

方法
说明
startTransaction()
Starts a new transaction, configured with the given options, on this session. Throws an exception if there is already a transaction in progress for the session. To learn more about this method, see the startTransaction() page in the MongoDB Server manual.
abortTransaction()
Ends the active transaction for this session. Throws an exception if there is no active transaction for the session or if the transaction is already committed or ended. To learn more about this method, see the abortTransaction() page in the MongoDB Server manual.
commitTransaction()
Commits the active transaction for this session. Throws an exception if there is no active transaction for the session or if the transaction was ended. To learn more about this method, see the commitTransaction() page in the MongoDB Server manual.

以下示例演示了如何创建会话、创建ACID 事务以及在一个ACID 事务中将文档插入到多个集合中。 该代码执行以下步骤:

  1. 使用startSession()方法从客户端创建会话

  2. 使用startTransaction()方法启动ACID 事务

  3. 将文档插入restaurantsmovies集合

  4. 使用commitTransaction()方法提交ACID 事务

MongoClient mongoClient = MongoClients.create(settings);
MongoDatabase restaurantsDatabase = mongoClient.getDatabase("sample_restaurants");
MongoCollection<Document> restaurants = restaurantsDatabase.getCollection("restaurants");
MongoDatabase moviesDatabase = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> movies = moviesDatabase.getCollection("movies");
Mono.from(mongoClient.startSession())
.flatMap(session -> {
// Begins the transaction
session.startTransaction();
// Inserts documents in the given order
return Mono.from(restaurants.insertOne(session, new Document("name", "Reactive Streams Pizza").append("cuisine", "Pizza")))
.then(Mono.from(movies.insertOne(session, new Document("title", "Java: Into the Streams").append("type", "Movie"))))
// Commits the transaction
.flatMap(result -> Mono.from(session.commitTransaction())
.thenReturn(result))
.onErrorResume(error -> Mono.from(session.abortTransaction()).then(Mono.error(error)))
.doFinally(signalType -> session.close());
})
// Closes the client after the transaction completes
.doFinally(signalType -> mongoClient.close())
// Prints the results of the transaction
.subscribe(
result -> System.out.println("Transaction succeeded"),
error -> System.err.println("Transaction failed: " + error)
);

要了解有关本指南中提到的概念的更多信息,请参阅服务器手册中的以下页面:

要进一步了解本指南所讨论的任何类型或方法,请参阅以下 API 文档:

后退

批量写入操作