db.collection.bulkWrite()
带驱动程序的 MongoDB
本页面提供 mongosh
方法的相关信息。要查看 MongoDB 驱动程序中的等效方法,请参阅编程语言的相应页面:
定义
db.collection.bulkWrite()
执行多个写操作,并控制执行顺序。
返回: - 如果操作使用 写关注来运行,则布尔值
acknowledged
为true
;如果已禁用写关注,则为false
- 每个写入操作的计数。
- 一个数组:为每份成功插入或更新的文档一个
_id
。
- 如果操作使用 写关注来运行,则布尔值
兼容性
db.collection.bulkWrite()
在以下环境中托管的部署中可用:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
注意
所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令。
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
注意
您无法在 Atlas UI 中执行批量写入操作。要插入多个文档,必须插入一组文档。要了解更多信息,请参阅 Atlas 文档中的创建、查看、更新和删除文档。
语法
bulkWrite()
方法采用以下形式:
db.collection.bulkWrite( [ <operation 1>, <operation 2>, ... ], { writeConcern : <document>, ordered : <boolean> } )
bulkWrite()
方法使用以下参数:
Parameter | 类型 | 说明 |
---|---|---|
| 阵列 | |
| 文档 | |
| 布尔 |
行为
bulkWrite()
获取一系列写入操作并执行每个写入操作。默认情况下,操作按顺序执行。有关控制写入操作执行顺序的信息,请参见执行操作。
写入操作
insertOne
将单个文档插入集合中。
db.collection.bulkWrite( [ { insertOne : { "document" : <document> } } ] )
updateOne 和 updateMany
updateOne
更新集合中与过滤器匹配的单个文档。如果存在多个匹配的文档,updateOne
将只更新第一个匹配的文档。
db.collection.bulkWrite( [ { updateOne : { "filter": <document>, "update": <document or pipeline>, "upsert": <boolean>, "collation": <document>, "arrayFilters": [ <filterdocument1>, ... ], "hint": <document|string> } } ] )
updateMany
更新集合中与过滤器匹配的所有文档。
db.collection.bulkWrite( [ { updateMany : { "filter" : <document>, "update" : <document or pipeline>, "upsert" : <boolean>, "collation": <document>, "arrayFilters": [ <filterdocument1>, ... ], "hint": <document|string> } } ] )
字段 | 注意 |
---|---|
| 更新的选择条件。可以使用与 |
| |
| 可选。一个布尔值,表明是否执行更新插入操作。 默认情况下, |
| 选修的。大量过滤器文档,用于确定针对大量字段的更新操作要修改哪些大量元素。 |
| 可选。指定用于操作的排序规则。 |
| 可选。用于支持更新 |
有关详细信息,请参阅 db.collection.updateOne()
和 db.collection.updateMany()
。
replaceOne
replaceOne
将替换集合中与筛选器匹配的单个文档。如果存在多个匹配的文档,replaceOne
将只替换第一个匹配的文档。
db.collection.bulkWrite([ { replaceOne : { "filter" : <document>, "replacement" : <document>, "upsert" : <boolean>, "collation": <document>, "hint": <document|string> } } ] )
字段 | 注意 |
---|---|
| 替换操作的选择条件。可以使用与 |
| 替换文档。文档不能包含更新操作符。 |
| 可选。一个布尔值,表明是否执行更新插入操作。默认情况下, |
| 可选。指定用于操作的排序规则。 |
| 可选。用于支持更新 |
有关详细信息,请参阅 db.collection.replaceOne()
。
deleteOne 和 deleteMany
deleteOne
删除集合中与过滤器匹配的单个文档。如果存在多个匹配的文档,deleteOne
将只删除第一个匹配的文档。
db.collection.bulkWrite([ { deleteOne : { "filter" : <document>, "collation" : <document> // Available starting in 3.4 } } ] )
deleteMany
删除集合中与过滤器匹配的所有文档。
db.collection.bulkWrite([ { deleteMany: { "filter" : <document>, "collation" : <document> // Available starting in 3.4 } } ] )
字段 | 注意 |
---|---|
| 删除操作的选择条件。可以使用与 |
| 可选。指定用于操作的排序规则。 |
有关详细信息,请参阅 db.collection.deleteOne()
和 db.collection.deleteMany()
。
_id
字段
如果文档未指定 _id 字段,则 mongod
会添加 _id
字段,并在插入或更新该文档之前为其分配唯一的 ObjectId()
字段。多数驱动程序都会创建一个 ObjectId 并插入 _id
字段,但如果驱动程序或应用程序不这样做,则 mongod
将创建并填充 _id
。
如果文档包含 _id
字段,则 _id
值在集合中必须是唯一的,以避免重复键错误。
更新或替换操作不能指定与原始文档不同的 _id
值。
执行操作
参数 ordered
指定 bulkWrite()
是否按顺序执行操作。默认情况下,操作按顺序执行。
如下代码表示包含五个操作的 bulkWrite()
。
db.collection.bulkWrite( [ { insertOne : <document> }, { updateOne : <document> }, { updateMany : <document> }, { replaceOne : <document> }, { deleteOne : <document> }, { deleteMany : <document> } ] )
在默认的 ordered : true
状态下,每个操作都会按顺序执行,从第一个操作 insertOne
到最后一个操作 deleteMany
。
如果将 ordered
设定为 false,则可通过 mongod
对操作重新排序,以提高性能。应用程序不应依赖于操作的执行顺序。
以下代码表示具有六个操作的无序 bulkWrite()
:
db.collection.bulkWrite( [ { insertOne : <document> }, { updateOne : <document> }, { updateMany : <document> }, { replaceOne : <document> }, { deleteOne : <document> }, { deleteMany : <document> } ], { ordered : false } )
如果使用 ordered : false
,操作结果可能会有所不同。例如,deleteOne
或deleteMany
可能会删除更多或更少的文档,具体取决于是在insertOne
、updateOne
、updateMany
或replaceOne
操作之前还是之后运行。
每组中的操作次数不得超过数据库的 maxWriteBatchSize 值。maxWriteBatchSize
的默认值为 100,000
。该值会显示在 hello.maxWriteBatchSize
字段中。
此限制可防止错误消息过大的问题。如果一个群组超过此限制,则客户端驱动程序会将该群组分成计数小于或等于限制值的更小群组。例如,对于 maxWriteBatchSize
值为 100,000
的情况,如果队列由 200,000
个操作组成,则驱动程序将创建 2 个群组,每个群组包含 100,000
个操作。
注意
该驱动程序仅在使用高级 API 时会将上述群组划分为多个更小的群组。如果直接使用 db.runCommand()
(例如,在编写驱动程序时),MongoDB 在尝试执行超过限制的写入批处理时会引发错误。
如果单个批处理的错误报告变得过大,MongoDB 会将所有剩余的错误消息截断为空字符串。如果至少存在两条总大小大于 1MB
的错误消息,则会将其截断。
大小和分组机制是内部性能细节,在未来版本中可能会有变化。
在分片集合上执行操作的 ordered
列表通常比执行 unordered
列表慢,因为对于有序列表,每个操作都必须等待前一个操作完成。
固定大小集合
bulkWrite()
写入操作在固定大小集合上使用时会受到限制。
updateOne
和 updateMany
将抛出一个 WriteError
(如果 update
条件导致正在修改的文档的大小增大)。
replaceOne
如果 replacement
文档大于原始文档,则抛出 WriteError
。
deleteOne
和 deleteMany
如果用在固定大小集合上则会抛出 WriteError
。
时间序列集合
bulkWrite()
写入操作在时间序列集合上使用时会受到限制。在时间序列集合上,只能使用 insertOne
。所有其他操作都将返回一个 WriteError
。
Error Handling
db.collection.bulkWrite()
出错时会抛出 BulkWriteError
异常。 请参阅 事务中的错误处理。
当排除了写关注错误时,有序操作将在发生错误之后停止,而无序操作会继续处理队列中的任何剩余写操作(除非在事务内运行)。请参阅事务中的错误处理。
写关注错误显示在 writeConcernErrors
字段中,所有其他错误显示在 writeErrors
字段中。如果遇到错误,将显示成功的写入操作的数量,而不是插入的 _id
值。有序操作会显示遇到的单个错误,而无序操作会显示数组中的每个错误。
模式验证错误
如果您的集合使用模式验证并将validationAction
设立为error
,则插入无效文档或使用无效值更新文档会引发错误。 执行operations
大量中无效操作之前的操作并将其写入集合。 ordered
字段决定是否执行剩余操作。
事务
db.collection.bulkWrite()
可以在分布式事务中使用。
重要
在大多数情况下,与单文档写入操作相比,分布式事务会产生更高的性能成本,并且分布式事务的可用性不应取代有效的模式设计。在许多情况下,非规范化数据模型(嵌入式文档和数组)仍然是数据和使用案例的最佳选择。换言之,对于许多场景,适当的数据建模将最大限度地减少对分布式事务的需求。
有关其他事务使用注意事项(如运行时间限制和 oplog 大小限制),另请参阅生产注意事项。
在事务中插入和更新插入
对于特征兼容性版本 (fcv) "4.4"
及更高版本,如果在事务中对不存在的集合执行了插入操作或更新操作 (upsert: true
),则会隐式创建该集合。
注意
您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。
写关注和事务
如果是在事务中运行,则请勿显式设置此操作的写关注。要将写关注与事务一起使用,请参阅事务和写关注。
事务内部的错误处理
从 MongoDB 4.2 开始,如果 db.collection.bulkWrite()
操作在事务内部遇到错误,则该方法会抛出 BulkWriteException(与在事务外部相同)。
在 4.0 中,如果 bulkWrite
操作在事务内遇到错误,则引发的错误不会包装为 BulkWriteException
。
在事务内部,批量写入中的第一个错误会导致整个批量写入失败并中止事务,即使批量写入是无序的也是如此。
示例
有序批量写入示例
了解 bulkWrite()
操作排序和错误处理非常重要。默认情况下,bulkWrite()
运行一个有序的操作列表:
操作以串行方式执行。
如果某个操作出错,将不会执行此操作以及任何后续操作。
在错误操作完成之前列出的操作。
bulkWrite()
示例使用了 pizzas
集合:
db.pizzas.insertMany( [ { _id: 0, type: "pepperoni", size: "small", price: 4 }, { _id: 1, type: "cheese", size: "medium", price: 7 }, { _id: 2, type: "vegan", size: "large", price: 8 } ] )
下面的 bulkWrite()
示例在 pizzas
集合上执行这些操作:
使用
insertOne
添加两个文档。使用
updateOne
更新一个文档。使用
deleteOne
删除文档。使用
replaceOne
替换一个文档。
try { db.pizzas.bulkWrite( [ { insertOne: { document: { _id: 3, type: "beef", size: "medium", price: 6 } } }, { insertOne: { document: { _id: 4, type: "sausage", size: "large", price: 10 } } }, { updateOne: { filter: { type: "cheese" }, update: { $set: { price: 8 } } } }, { deleteOne: { filter: { type: "pepperoni"} } }, { replaceOne: { filter: { type: "vegan" }, replacement: { type: "tofu", size: "small", price: 4 } } } ] ) } catch( error ) { print( error ) }
输出示例,包括已完成操作的摘要:
{ acknowledged: true, insertedCount: 2, insertedIds: { '0': 3, '1': 4 }, matchedCount: 2, modifiedCount: 2, deletedCount: 1, upsertedCount: 0, upsertedIds: {} }
在运行上一个 bulkWrite()
示例之前,如果此集合已经包含一个 _id
为 4
的文档,将为第二个 insertOne
操作返回如下重复键异常:
writeErrors: [ WriteError { err: { index: 1, code: 11000, errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 4 }', op: { _id: 4, type: 'sausage', size: 'large', price: 10 } } } ], result: BulkWriteResult { result: { ok: 1, writeErrors: [ WriteError { err: { index: 1, code: 11000, errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 4 }', op: { _id: 4, type: 'sausage', size: 'large', price: 10 } } } ], writeConcernErrors: [], insertedIds: [ { index: 0, _id: 3 }, { index: 1, _id: 4 } ], nInserted: 1, nUpserted: 0, nMatched: 0, nModified: 0, nRemoved: 0, upserted: [] } }
由于 bulkWrite()
示例是有序的,因此仅完成了第一个 insertOne
操作。
要完成所有没有错误的操作,请运行 bulkWrite()
,并将 ordered
设置为 false
。有关示例,请参阅以下部分。
无序批量写入示例
要指定无序 bulkWrite()
,请将 ordered
设置为 false
。
在一个无序的 bulkWrite()
操作列表中:
操作可以并行运行(不保证)。有关详细信息,请参阅有序操作和无序操作。
出现错误的操作未完成。
所有无错误的操作都已完成。
继续 pizzas
集合示例,删除并重新创建此集合:
db.pizzas.insertMany( [ { _id: 0, type: "pepperoni", size: "small", price: 4 }, { _id: 1, type: "cheese", size: "medium", price: 7 }, { _id: 2, type: "vegan", size: "large", price: 8 } ] )
在以下示例中:
bulkWrite()
对pizzas
集合运行无序操作。第二个
insertOne
操作具有与第一个insertOne
相同的_id
,这样会产生一个重复键错误。
try { db.pizzas.bulkWrite( [ { insertOne: { document: { _id: 3, type: "beef", size: "medium", price: 6 } } }, { insertOne: { document: { _id: 3, type: "sausage", size: "large", price: 10 } } }, { updateOne: { filter: { type: "cheese" }, update: { $set: { price: 8 } } } }, { deleteOne: { filter: { type: "pepperoni"} } }, { replaceOne: { filter: { type: "vegan" }, replacement: { type: "tofu", size: "small", price: 4 } } } ], { ordered: false } ) } catch( error ) { print( error ) }
输出示例,包括重复键错误和已完成操作的摘要:
writeErrors: [ WriteError { err: { index: 1, code: 11000, errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 3 }', op: { _id: 3, type: 'sausage', size: 'large', price: 10 } } } ], result: BulkWriteResult { result: { ok: 1, writeErrors: [ WriteError { err: { index: 1, code: 11000, errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 3 }', op: { _id: 3, type: 'sausage', size: 'large', price: 10 } } } ], writeConcernErrors: [], insertedIds: [ { index: 0, _id: 3 }, { index: 1, _id: 3 } ], nInserted: 1, nUpserted: 0, nMatched: 2, nModified: 2, nRemoved: 1, upserted: [] } }
由于重复键错误,第二个 insertOne
操作失败。在无序 bulkWrite()
中,任何没有错误的操作都已完成。
带有写关注的批量写入示例
继续 pizzas
集合示例,删除并重新创建此集合:
db.pizzas.insertMany( [ { _id: 0, type: "pepperoni", size: "small", price: 4 }, { _id: 1, type: "cheese", size: "medium", price: 7 }, { _id: 2, type: "vegan", size: "large", price: 8 } ] )
以下 bulkWrite()
示例对 pizzas
集合运行操作,并设置 "majority"
写关注和 100 毫秒超时:
try { db.pizzas.bulkWrite( [ { updateMany: { filter: { size: "medium" }, update: { $inc: { price: 0.1 } } } }, { updateMany: { filter: { size: "small" }, update: { $inc: { price: -0.25 } } } }, { deleteMany: { filter: { size: "large" } } }, { insertOne: { document: { _id: 4, type: "sausage", size: "small", price: 12 } } } ], { writeConcern: { w: "majority", wtimeout: 100 } } ) } catch( error ) { print( error ) }
如果大多数副本集成员确认操作的时间超过 wtimeout
,则该示例返回写关注错误和已完成操作的摘要:
result: BulkWriteResult { result: { ok: 1, writeErrors: [], writeConcernErrors: [ WriteConcernError { err: { code: 64, codeName: 'WriteConcernFailed', errmsg: 'waiting for replication timed out', errInfo: { wtimeout: true, writeConcern: [Object] } } } ], insertedIds: [ { index: 3, _id: 4 } ], nInserted: 0, nUpserted: 0, nMatched: 2, nModified: 2, nRemoved: 0, upserted: [], opTime: { ts: Timestamp({ t: 1660329086, i: 2 }), t: Long("1") } } }