Docs 菜单

批量写入操作

在本指南中,您可以学习;了解如何使用PyMongo执行批量操作。批量操作通过在单个方法中执行多个写入操作来减少对服务器的调用次数。

CollectionMongoClient 类均提供 bulk_write() 方法。在 Collection实例上调用 bulk_write() 时,您可以对单个集合执行多个写入操作。在 MongoClient实例上调用 bulk_write() 时,您可以跨多个命名空间执行批量写入。在MongoDB中,命名空间由数据库名称和集合名称组成,格式为 <database>.<collection>

重要

要对 MongoClient实例执行批量操作,请确保您的应用程序满足以下要求:

  • 使用PyMongo v4.9 或更高版本

  • 连接到MongoDB Server v8.0 或更高版本

本指南中的示例使用sample_restaurants.restaurants sample_mflix.moviesAtlas示例数据集中的 和 集合。要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅 PyMongo入门教程

对于要执行的每个写入操作,请创建以下操作类之一的实例:

  • InsertOne

  • UpdateOne

  • UpdateMany

  • ReplaceOne

  • DeleteOne

  • DeleteMany

然后,将这些实例的列表传递给bulk_write()方法。

重要

确保将写入操作类导入到应用程序文件中,如以下代码所示:

from pymongo import InsertOne, UpdateOne, UpdateMany, ReplaceOne, DeleteOne, DeleteMany

以下部分介绍如何创建上述类的实例,您可以使用这些实例执行集合和客户端端批量操作。

要执行插入操作,请创建 InsertOne 的实例并指定要插入的文档。将以下关键字参数传递给 InsertOne 构造函数:

  • namespace:要在其中插入文档的命名空间空间。如果对单个集合执行批量操作,则此参数是可选的。

  • document:要插入的文档。

以下示例创建了一个InsertOne实例:

operation = InsertOne(
namespace="sample_restaurants.restaurants",
document={
"name": "Mongo's Deli",
"cuisine": "Sandwiches",
"borough": "Manhattan",
"restaurant_id": "1234"
}
)

您还可以通过将自定义类的实例传递给构造函数来创建 InsertOne 的实例。如果您使用类型检查工具,这将提供额外的类型安全性。您传递的实例必须从 TypedDict 类继承。

注意

Python 3.7 及更早版本中的 TypedDict

TypedDict 类位于 typing 模块中,该模块仅在Python 3.8 及更高版本中可用。要在早期版本的Python中使用 TypedDict 类,请安装typing_extensions包。

以下示例使用自定义类构造一个 InsertOne实例,以提高类型安全性:

class Restaurant (TypedDict):
name: str
cuisine: str
borough: str
restaurant_id: str
operation = pymongo.InsertOne(Restaurant(
name="Mongo's Deli", cuisine="Sandwiches", borough="Manhattan", restaurant_id="1234"))

要插入多个文档,请为每个文档创建一个InsertOne实例。

注意

_id字段必须是唯一的

在MongoDB集合中,每个文档都必须包含具有唯一值的 _id字段。

如果为 _id字段指定值,则必须确保该值在集合中是唯一的。如果不指定值,驾驶员会自动为该字段生成唯一的 ObjectId 值。

我们建议让驾驶员自动生成 _id 值以确保唯一性。重复的 _id 值违反了唯一索引约束,导致驾驶员返回错误。

要更新文档,请创建UpdateOne的实例并传入以下参数:

  • namespace:执行更新的命名空间空间。如果对单个集合执行批量操作,则此参数是可选的。

  • filter查询过滤,指定用于匹配集合中的文档的条件。

  • update:要执行的更新。有关更新操作的更多信息,请参阅MongoDB Server手册中的字段更新操作指南。

UpdateOne 更新与查询筛选器匹配的第一个文档。

以下示例创建了一个UpdateOne实例:

operation = UpdateOne(
namespace="sample_restaurants.restaurants",
filter={ "name": "Mongo's Deli" },
update={ "$set": { "cuisine": "Sandwiches and Salads" }}
)

要更新多个文档,请创建UpdateMany的实例并传入相同的参数。 UpdateMany更新与查询过滤匹配的所有文档。

以下示例创建了一个UpdateMany实例:

operation = UpdateMany(
namespace="sample_restaurants.restaurants",
filter={ "name": "Mongo's Deli" },
update={ "$set": { "cuisine": "Sandwiches and Salads" }}
)

替换操作会删除指定文档的所有字段和值,然后替换为新的字段和值。要执行替换操作,请创建 ReplaceOne 的实例并传入以下参数:

  • namespace:要在其中执行替换操作的命名空间。如果对单个集合执行批量操作,则此参数是可选的。

  • filter查询过滤,指定用于匹配要替换的文档的条件。

  • replacement:包含要存储在匹配文档中的新字段和值的文档。

以下示例创建了一个ReplaceOne实例:

operation = ReplaceOne(
namespace="sample_restaurants.restaurants",
filter={ "restaurant_id": "1234" },
replacement={
"name": "Mongo's Pizza",
"cuisine": "Pizza",
"borough": "Brooklyn",
"restaurant_id": "5678"
}
)

您还可以通过将自定义类的实例传递给构造函数来创建 ReplaceOne 的实例。如果您使用类型检查工具,这将提供额外的类型安全性。您传递的实例必须从 TypedDict 类继承。

注意

Python 3.7 及更早版本中的 TypedDict

TypedDict 类位于 typing 模块中,该模块仅在Python 3.8 及更高版本中可用。要在早期版本的Python中使用 TypedDict 类,请安装typing_extensions包。

以下示例使用自定义类构造了一个 ReplaceOne实例,以提高类型安全性:

class Restaurant (TypedDict):
name: str
cuisine: str
borough: str
restaurant_id: str
operation = pymongo.ReplaceOne(
{ "restaurant_id": "1234" },
Restaurant(name="Mongo's Pizza", cuisine="Pizza", borough="Brooklyn", restaurant_id="5678")
)

要替换多个文档,必须为每个文档创建一个ReplaceOne实例。

提示

类型检查工具

要学习;了解有关Python可用的类型检查工具的更多信息,请参阅“工具”页面上的“类型检查器”。

要删除文档,请创建 DeleteOne 的实例并传入以下参数:

  • namespace:要删除文档的命名空间空间。如果对单个集合执行批量操作,则此参数是可选的。

  • filter查询过滤,指定用于匹配要删除的文档的条件。

DeleteOne 仅删除与查询过滤匹配的第一个文档。

以下示例创建了一个DeleteOne实例:

operation = DeleteOne(
namespace="sample_restaurants.restaurants",
filter={ "restaurant_id": "5678" }
)

要删除多个文档,请创建 DeleteMany实例并传入命名空间和查询过滤,指定要删除的文档。DeleteMany 会删除与查询过滤匹配的所有文档。

以下示例创建了一个DeleteMany实例:

operation = DeleteMany(
namespace="sample_restaurants.restaurants",
filter={ "name": "Mongo's Deli" }
)

为要执行的每个操作定义类实例后,将这些实例的列表传递给 bulk_write() 方法。在 Collection实例上调用 bulk_write() 方法以写入单个集合,或在 MongoClient实例上调用 方法以写入多个命名空间。

如果对 Collection 调用的任何写入操作失败, PyMongo将引发 BulkWriteError 并且不会执行任何进一步的操作。BulkWriteError 提供了一个 details 属性,其中包括失败的操作以及有关异常的详细信息。

如果对 MongoClient 调用的任何写入操作失败, PyMongo将引发 ClientBulkWriteException 并且不会执行任何进一步的操作。ClientBulkWriteException 提供了 error 属性,其中包括有关异常的信息。

注意

当PyMongo运行批量操作时,它会使用运行该操作的集合或客户端的 write_concern。您还可以在使用 MongoClient.bulk_write() 方法时为操作设立写关注(write concern)。无论执行顺序如何,驾驶员在尝试所有操作后都会报告所有写关注(write concern)错误。

要学习;了解有关写入关注的更多信息,请参阅MongoDB Server手册中的写关注

以下示例通过在 Collection实例上使用 bulk_write() 方法对 restaurants集合执行多个写入操作:

operations = [
InsertOne(
document={
"name": "Mongo's Deli",
"cuisine": "Sandwiches",
"borough": "Manhattan",
"restaurant_id": "1234"
}
),
InsertOne(
document={
"name": "Mongo's Deli",
"cuisine": "Sandwiches",
"borough": "Brooklyn",
"restaurant_id": "5678"
}
),
UpdateMany(
filter={ "name": "Mongo's Deli" },
update={ "$set": { "cuisine": "Sandwiches and Salads" }}
),
DeleteOne(
filter={ "restaurant_id": "1234" }
)
]
results = restaurants.bulk_write(operations)
print(results)
BulkWriteResult({'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 2,
'nUpserted': 0, 'nMatched': 2, 'nModified': 2, 'nRemoved': 1, 'upserted': []},
acknowledged=True)

以下示例通过在 MongoClient实例上使用 bulk_write() 方法,对 sample_restaurants.restaurantssample_mflix.movies 命名空间执行多个写入操作:

operations = [
InsertOne(
namespace="sample_mflix.movies",
document={
"title": "Minari",
"runtime": 217,
"genres": ["Drama", "Comedy"]
}
),
UpdateOne(
namespace="sample_mflix.movies",
filter={ "title": "Minari" },
update={ "$set": { "runtime": 117 }}
),
DeleteMany(
namespace="sample_restaurants.restaurants",
filter={ "cuisine": "French" }
)
]
results = client.bulk_write(operations)
print(results)
ClientBulkWriteResult({'anySuccessful': True, 'error': None, 'writeErrors': [],
'writeConcernErrors': [], 'nInserted': 1, 'nUpserted': 0, 'nMatched': 1,
'nModified': 1, 'nDeleted': 344, 'insertResults': {}, 'updateResults': {},
'deleteResults': {}}, acknowledged=True, verbose=False)

bulk_write() 方法可以选择接受其他参数,这些参数表示可用于配置批量写入操作的选项。

下表描述了可以传递给 Collection.bulk_write() 方法的选项:

属性
说明

ordered

If True, the driver performs the write operations in the order provided. If an error occurs, the remaining operations are not attempted.

If False, the driver performs the operations in an arbitrary order and attempts to perform all operations.
Defaults to True.

bypass_document_validation

Specifies whether the operation bypasses document-level validation. For more information, see Schema Validation in the MongoDB Server manual.
Defaults to False.

session

An instance of ClientSession. For more information, see the API documentation.

comment

A comment to attach to the operation. For more information, see the delete command fields guide in the MongoDB Server manual.

let

A map of parameter names and values. Values must be constant or closed expressions that don't reference document fields. For more information, see the let statement in the MongoDB Server manual.

以下示例调用前面的集合批量写入示例中的 bulk_write() 方法,但将 ordered 选项设置为 False

results = restaurants.bulk_write(operations, ordered=False)

如果无序批量写入中的任何写入操作失败, PyMongo仅在尝试所有操作后才会报告错误。

注意

无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。

下表描述了可以传递给 MongoClient.bulk_write() 方法的选项:

属性
说明

session

An instance of ClientSession. For more information, see the API documentation.

ordered

If True, the driver performs the write operations in the order provided. If an error occurs, the remaining operations are not attempted.

If False, the driver performs the operations in an arbitrary order and attempts to perform all operations.
Defaults to True.

verbose_results

Specifies whether the operation returns detailed results for each successful operation.
Defaults to False.

bypass_document_validation

Specifies whether the operation bypasses document-level validation. For more information, see Schema Validation in the MongoDB Server manual.
Defaults to False.

comment

A comment to attach to the operation. For more information, see the delete command fields guide in the MongoDB Server manual.

let

A map of parameter names and values. Values must be constant or closed expressions that don't reference document fields. For more information, see the let statement in the MongoDB Server manual.

write_concern

Specifies the write concern to use for the bulk operation. For more information, see Write Concern in the MongoDB Server manual.

以下示例调用前面客户端批量写入示例中的 bulk_write() 方法,但将 verbose_results 选项设置为 True

results = client.bulk_write(operations, verbose_results=True)
ClientBulkWriteResult({'anySuccessful': True, 'error': None, 'writeErrors': [],
'writeConcernErrors': [], 'nInserted': 1, 'nUpserted': 0, 'nMatched': 1, 'nModified': 1,
'nDeleted': 344, 'insertResults': {0: InsertOneResult(ObjectId('...'),
acknowledged=True)}, 'updateResults': {1: UpdateResult({'ok': 1.0, 'idx': 1, 'n': 1,
'nModified': 1}, acknowledged=True)}, 'deleteResults': {2: DeleteResult({'ok': 1.0,
'idx': 2, 'n': 344}, acknowledged=True)}}, acknowledged=True, verbose=True)

本节介绍以下批量操作方法的返回值:

Collection.bulk_write()方法返回一个BulkWriteResult对象。 BulkWriteResult对象包含以下属性:

属性
说明

acknowledged

Indicates if the server acknowledged the write operation.

bulk_api_result

The raw bulk API result returned by the server.

deleted_count

The number of documents deleted, if any.

inserted_count

The number of documents inserted, if any.

matched_count

The number of documents matched for an update, if applicable.

modified_count

The number of documents modified, if any.

upserted_count

The number of documents upserted, if any.

upserted_ids

A map of the operation's index to the _id of the upserted documents, if applicable.

MongoClient.bulk_write()方法返回一个ClientBulkWriteResult对象。 ClientBulkWriteResult对象包含以下属性:

属性
说明

acknowledged

Indicates if the server acknowledged the write operation.

bulk_api_result

The raw bulk API result returned by the server.

delete_results

A map of any successful delete operations and their results.

deleted_count

The number of documents deleted, if any.

has_verbose_results

Indicates whether the returned results are verbose.

insert_results

A map of any successful insert operations and their results.

inserted_count

The number of documents inserted, if any.

matched_count

The number of documents matched for an update, if applicable.

modified_count

The number of documents modified, if any.

update_results

A map of any successful update operations and their results.

upserted_count

The number of documents upserted, if any.

如果没有为 MongoClient对象添加类型注解,类型检查器可能会显示类似于以下内容的错误:

from pymongo import MongoClient
client = MongoClient() # error: Need type annotation for "client"

解决方案是将 MongoClient对象注释为 client: MongoClientclient: MongoClient[Dict[str, Any]]

如果您指定 MongoClient 作为类型提示,但不包含文档、键和值的数据类型,则类型检查器可能会显示类似于以下内容的错误:

error: Dict entry 0 has incompatible type "str": "int";
expected "Mapping[str, Any]": "int"

解决方案是将以下类型提示添加到 MongoClient对象:

``client: MongoClient[Dict[str, Any]]``

要了解如何执行单个写入操作,请参阅以下指南:

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: