批量写入操作
Overview
在本指南中,您可以学习;了解如何使用批量写入操作在单个数据库调用中执行多个写入操作。
考虑这样一个场景:您要插入一个文档,更新多个其他文档,然后删除一个文档。 如果使用单独的方法,则每个操作都需要调用自己的数据库。
通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:
集合:您可以使用
MongoCollection.bulkWrite()
方法对单个集合执行批量写入。在此方法中,每种写入操作都需要至少一次数据库调用。 示例,MongoCollection.bulkWrite()
将多个更新操作放在一次调用中,但对数据库进行两次单独的调用以执行插入操作和替换操作。客户端:如果您的应用程序连接到MongoDB Server8.0 或更高版本,则可以使用
MongoClient.bulkWrite()
方法对同一集群中的多个集合和数据库执行批量写入操作。此方法在一次数据库调用中执行所有写入。
样本数据
本指南中的示例使用Atlas示例数据集中的
sample_restaurants.restaurants
集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门教程。
重要
项目 Reactor 库
本指南使用 Project Reactor 库来使用Java Reactive Streams驾驶员方法返回的Publisher
实例。 要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 入门 在 Reactor 文档中。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB ”指南。
集合批量写入
批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 List
的 WriteModel
文档传递给 MongoCollection.bulkWrite()
方法。 WriteModel
是表示写入操作的模型。
对于要执行的每个写入操作,请创建以下从 WriteModel
继承的类之一的实例:
InsertOneModel
UpdateOneModel
UpdateManyModel
ReplaceOneModel
DeleteOneModel
DeleteManyModel
以下部分介绍如何创建和使用上述类的实例。
插入操作
要执行插入操作,请创建InsertOneModel
的实例并传入要插入的文档。
以下示例创建了一个InsertOneModel
实例:
InsertOneModel<Document> operation = new InsertOneModel<>( new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches"));
要插入多个文档,请为每个文档创建一个InsertOneModel
实例。
更新操作
要更新文档,请创建UpdateOneModel
的实例并传入以下参数:
查询过滤,指定用于匹配集合中文档的条件。
要执行的更新操作。 有关更新操作的更多信息,请参阅MongoDB Server手册中的字段更新操作符指南。
以下示例创建了一个UpdateOneModel
实例:
UpdateOneModel<Document> operation = new UpdateOneModel<>( eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads"));
如果多个文档与 UpdateOneModel
实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 UpdateOptions
实例中指定排序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
UpdateOptions options = UpdateOptions.sort(Sorts.ascending("_id"));
要更新多个文档,请创建UpdateManyModel
的实例并传入相同的参数。 UpdateManyModel
更新与查询过滤匹配的所有文档。
以下示例创建了一个UpdateManyModel
实例:
UpdateManyModel<Document> operation = new UpdateManyModel<>( eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads"));
替换操作
替换操作会删除指定文档中除_id
字段之外的所有字段和值,并替换为新字段和值。 要执行替换操作,请创建ReplaceOneModel
的实例并传入查询过滤以及要存储在匹配文档中的字段和值。
以下示例创建了一个ReplaceOneModel
实例:
ReplaceOneModel<Document> operation = new ReplaceOneModel<>( eq("name", "Original Pizza"), new Document("name", "Mongo's Pizza") .append("borough", "Manhattan"));
如果多个文档与 ReplaceOneModel
实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions
实例中指定排序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
ReplaceOptions options = ReplaceOptions.sort(Sorts.ascending("_id"));
提示
替换多个文档
要替换多个文档,请为每个文档创建一个ReplaceOneModel
实例。
删除操作
要删除文档,请创建DeleteOneModel
的实例并传入查询过滤,指定要删除的文档。 DeleteOneModel
仅删除与查询过滤匹配的第一个文档。
以下示例创建了一个DeleteOneModel
实例:
DeleteOneModel<Document> operation = new DeleteOneModel<>( eq("restaurant_id", "5678"));
要删除多个文档,请创建DeleteManyModel
的实例并传入查询过滤,指定要删除的文档。 DeleteManyModel
会删除与查询过滤匹配的所有文档。
以下示例创建了一个DeleteManyModel
实例:
DeleteManyModel<Document> operation = new DeleteManyModel<>( eq("name", "Mongo's Deli"));
执行批量操作
为要执行的每个操作定义 WriteModel
实例后,将这些实例的列表传递给 bulkWrite()
方法。 默认情况下,该方法按照列表中定义的顺序运行操作。
以下示例使用bulkWrite()
方法执行多个写入操作:
Publisher<BulkWriteResult> bulkWritePublisher = restaurants.bulkWrite( Arrays.asList(new InsertOneModel<>( new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Manhattan") .append("restaurant_id", "1234")), new InsertOneModel<>(new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Brooklyn") .append("restaurant_id", "5678")), new UpdateManyModel<>(eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads")), new DeleteOneModel<>(eq("restaurant_id", "1234")))); BulkWriteResult bulkResult = Mono.from(bulkWritePublisher).block(); System.out.println(bulkResult.toString());
AcknowledgedBulkWriteResult{insertedCount=2, matchedCount=2, removedCount=1, modifiedCount=2, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=66a7e0a6c08025218b657208}}, BulkWriteInsert{index=1, id=BsonObjectId{value=66a7e0a6c08025218b657209}}]}
如果任何写入操作失败, Java Reactive Streams驾驶员会发出MongoBulkWriteException
信号,并且不会执行任何进一步的单个操作。 MongoBulkWriteException
BulkWriteError
包括可使用 方法访问的MongoBulkWriteException.getWriteErrors()
,其中提供了单个故障的详细信息。
注意
当Java Reactive Streams驾驶员运行批量操作时,它会使用运行该操作的集合的writeConcern
。 无论执行顺序如何,驾驶员在尝试所有操作后都会报告所有写关注(write concern)错误。
自定义批量写入
BulkWriteOptions
类包含修改bulkWrite()
方法行为的方法。 要使用BulkWriteOptions
类,请构造该类的新实例,然后调用其一个或多个方法来修改写入操作。 您可以将这些方法调用链接在一起。 要修改写入操作的行为,请将类实例作为最后一个参数传递给bulkWrite()
方法。
您可以使用BulkWriteOptions
类中的以下方法来修改写入方法。 所有方法都是可选的。
方法 | 说明 |
---|---|
| Specifies whether the bulk write operation bypasses document validation. This lets you
perform write operations on documents that don't meet the schema validation requirements, if any
exist. For more information about schema validation, see Schema
Validation in the MongoDB
Server manual. |
| Attaches a Bson comment to the operation. For more information, see the insert command
fields guide in the
MongoDB Server manual. |
| Attaches a String comment to the operation. For more information, see the insert command
fields guide in the
MongoDB Server manual. |
| Specifies a map of parameter names and values. Values must be constant or closed
expressions that don't reference document fields. For more information,
see the let statement in the
MongoDB Server manual. |
| If set to True , the driver performs the individual operations in the order
provided. If an individual operation fails, the driver will not execute any
subsequent individual operations.Defaults to True . |
以下示例调用上一示例中的bulkWrite()
方法,但将ordered
选项设置为False
:
Publisher<BulkWriteResult> bulkWritePublisher = restaurants.bulkWrite( Arrays.asList(new InsertOneModel<>( new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Manhattan") .append("restaurant_id", "1234")), new InsertOneModel<>(new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Brooklyn") .append("restaurant_id", "5678")), new UpdateManyModel<>(eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads")), new DeleteOneModel<>(eq("restaurant_id", "1234"))), new BulkWriteOptions().ordered(false)); BulkWriteResult bulkResult = Mono.from(bulkWritePublisher).block(); System.out.println(bulkResult.toString());
AcknowledgedBulkWriteResult{insertedCount=2, matchedCount=2, removedCount=1, modifiedCount=2, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=66a7e03cce430c5854b6caf9}}, BulkWriteInsert{index=1, id=BsonObjectId{value=66a7e03cce430c5854b6cafa}}]}
如果无序批量写入中的任何写入操作失败, Java Reactive Streams驾驶员仅在尝试所有操作后才会报告错误。
注意
无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。
客户端批量写入
连接到运行MongoDB Server 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite()
方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite()
方法在一次调用中执行所有写入。
MongoClient.bulkWrite()
方法采用 ClientNamespacedWriteModel
实例列表来表示不同的写入操作。 您可以使用实例方法构造 ClientNamespacedWriteModel
接口的实例。 示例,ClientNamespacedInsertOneModel
的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne()
方法创建此模型。
下表描述了模型及其相应的实例方法。
模型 | 实例方法 | 说明 | 参数 |
---|---|---|---|
|
| 创建一个模型以将文档插入到 |
|
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以替换 |
|
|
| 创建模型以删除 |
|
|
| 创建模型以删除 |
|
以下部分提供了一些示例,说明如何创建模型和使用客户端bulkWrite()
方法。
插入操作
此示例演示如何创建包含插入两个文档的指令的模型。 将一个文档插入到 db.people
集合中,将另一文档插入到 db.things
集合中。 MongoNamespace
实例定义每个写入操作适用的目标数据库和集合。
ClientNamespacedInsertOneModel personToInsert = ClientNamespacedWriteModel .insertOne( new MongoNamespace("db", "people"), new Document("name", "Julia Smith") ); ClientNamespacedInsertOneModel thingToInsert = ClientNamespacedWriteModel .insertOne( new MongoNamespace("db", "things"), new Document("object", "washing machine") );
更新操作
以下示例展示如何使用 bulkWrite()
方法更新db.people
和 db.things
集合中的现有文档:
ClientNamespacedUpdateOneModel personUpdate = ClientNamespacedWriteModel .updateOne( new MongoNamespace("db", "people"), Filters.eq("name", "Freya Polk"), Updates.inc("age", 1) ); ClientNamespacedUpdateManyModel thingUpdate = ClientNamespacedWriteModel .updateMany( new MongoNamespace("db", "things"), Filters.eq("category", "electronic"), Updates.set("manufacturer", "Premium Technologies") );
此示例将 people
集合中 name
值为 "Freya Polk"
的文档中 age
字段的值递增 1
。它还将 things
集合中 category
值为 "electronic"
的所有文档的 manufacturer
字段的值设置为 "Premium Technologies"
。
如果多个文档与 ClientNamespacedUpdateOneModel
实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 ClientUpdateOneOptions 实例中指定排序顺序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
ClientUpdateOneOptions options = ClientUpdateOneOptions .clientUpdateOneOptions() .sort(Sorts.ascending("_id"));
替换操作
以下示例展示了如何创建模型来替换 db.people
和 db.things
集合中的现有文档:
ClientNamespacedReplaceOneModel personReplacement = ClientNamespacedWriteModel .replaceOne( new MongoNamespace("db", "people"), Filters.eq("_id", 1), new Document("name", "Frederic Hilbert") ); ClientNamespacedReplaceOneModel thingReplacement = ClientNamespacedWriteModel .replaceOne( new MongoNamespace("db", "things"), Filters.eq("_id", 1), new Document("object", "potato") );
此示例成功运行后,people
集合中 _id
值为 1
的文档将替换为新文档。 things
集合中 _id
值为 1
的文档将替换为新文档。
如果多个文档与 ClientNamespacedReplaceOneModel
实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在ClientReplaceOneOptions实例中指定排序顺序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
ClientReplaceOneOptions options = ClientReplaceOneOptions .clientReplaceOneOptions() .sort(Sorts.ascending("_id"));
执行批量操作
为要执行的每个操作定义 ClientNamespacedWriteModel
实例后,将这些实例的列表传递给客户端bulkWrite()
方法。 默认情况下,该方法按照指定的顺序运行操作。
以下示例使用bulkWrite()
方法执行多个写入操作:
MongoNamespace peopleNamespace = new MongoNamespace("db", "people"); MongoNamespace thingsNamespace = new MongoNamespace("db", "things"); List<ClientNamespacedWriteModel> bulkOperations = Arrays.asList( ClientNamespacedWriteModel .insertOne( peopleNamespace, new Document("name", "Corey Kopper") ), ClientNamespacedWriteModel .replaceOne( thingsNamespace, Filters.eq("_id", 1), new Document("object", "potato") ) ); Publisher<ClientBulkWriteResult> bulkWritePublisher = mongoClient .bulkWrite(bulkOperations); ClientBulkWriteResult clientBulkResult = Mono .from(bulkWritePublisher) .block(); System.out.println(clientBulkResult.toString());
AcknowledgedSummaryClientBulkWriteResult{insertedCount=1, matchedCount=1, ...}
如果任何写入操作失败,驾驶员都会引发 ClientBulkWriteException
,并且不会执行任何进一步的单个操作。 ClientBulkWriteException
包括可使用 ClientBulkWriteException.getWriteErrors()
方法访问的 BulkWriteError
,其中提供了单个故障的详细信息。
自定义批量写入
您可以将 ClientBulkWriteOptions
的实例传递给 bulkWrite()
方法,以自定义驾驶员执行批量写入操作的方式。
执行顺序
默认下,驾驶员会按照您指定的顺序运行批量操作中的各个操作,直到出现错误或操作成功完成。
但是,您可以在创建 ClientBulkWriteOptions
实例时将 false
传递给 ordered()
方法,以指示驾驶员以无序方式执行写入操作。 使用 unordered 选项时,产生错误的操作不会阻止驾驶员在批量写入操作中运行其他写入操作。
以下代码在 ClientBulkWriteOptions
的实例中将 ordered
选项设置为 false
,并执行批量写入操作以插入多个文档。
MongoNamespace namespace = new MongoNamespace("db", "people"); ClientBulkWriteOptions options = ClientBulkWriteOptions .clientBulkWriteOptions() .ordered(false); List<ClientNamespacedWriteModel> bulkOperations = Arrays.asList( ClientNamespacedWriteModel.insertOne( namespace, new Document("_id", 1).append("name", "Rudra Suraj") ), // Causes a duplicate key error ClientNamespacedWriteModel.insertOne( namespace, new Document("_id", 1).append("name", "Mario Bianchi") ), ClientNamespacedWriteModel.insertOne( namespace, new Document("name", "Wendy Zhang") ) ); Publisher<ClientBulkWriteResult> bulkWritePublisher = mongoClient .bulkWrite(bulkOperations, options);
即使插入具有重复键的文档的写入操作会导致错误,也会执行其他操作,因为写入操作是无序的。
更多信息
要了解如何执行单个写入操作,请参阅以下指南:
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: