Docs 菜单

db.collection.bulkWrite()

带驱动程序的 MongoDB

本页面提供 mongosh 方法的相关信息。要查看 MongoDB 驱动程序中的等效方法,请参阅编程语言的相应页面:

db.collection.bulkWrite()

执行多个写操作,并控制执行顺序。

返回:
  • 如果操作使用 写关注来运行,则布尔值 acknowledgedtrue;如果已禁用写关注,则为 false
  • 每个写入操作的计数。
  • 一个数组:为每份成功插入或更新的文档一个 _id

db.collection.bulkWrite() 在以下环境中托管的部署中可用:

注意

所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令

注意

您无法在 Atlas UI 中执行批量写入操作。要插入多个文档,必须插入一组文档。要了解更多信息,请参阅 Atlas 文档中的创建、查看、更新和删除文档

bulkWrite() 方法采用以下形式:

db.collection.bulkWrite(
[ <operation 1>, <operation 2>, ... ],
{
writeConcern : <document>,
ordered : <boolean>
}
)

bulkWrite() 方法使用以下参数:

Parameter
类型
说明

operations

阵列

bulkWrite()写入操作的大量。

有效操作包括:

有关每个操作的用法,请参阅写入操作

writeConcern

文档

可选。表达写关注的文档。省略以使用默认写关注。

如果是在事务中运行,则请勿显式设置此操作的写关注。要将写关注与事务一起使用,请参阅事务和写关注。

ordered

布尔

可选。一个布尔值,指定 mongod 实例应该执行有序操作还是无序操作。默认为 true

参阅操作的执行

bulkWrite() 获取一系列写入操作并执行每个写入操作。默认情况下,操作按顺序执行。有关控制写入操作执行顺序的信息,请参见执行操作

将单个文档插入集合中。

db.collection.bulkWrite( [
{ insertOne : { "document" : <document> } }
] )

请参阅 db.collection.insertOne()

updateOne 更新集合中与过滤器匹配的单个文档。如果存在多个匹配的文档,updateOne 将只更新第一个匹配的文档。

db.collection.bulkWrite( [
{ updateOne :
{
"filter": <document>,
"update": <document or pipeline>,
"upsert": <boolean>,
"collation": <document>,
"arrayFilters": [ <filterdocument1>, ... ],
"hint": <document|string>
}
}
] )

updateMany 更新集合中与过滤器匹配的所有文档。

db.collection.bulkWrite( [
{ updateMany :
{
"filter" : <document>,
"update" : <document or pipeline>,
"upsert" : <boolean>,
"collation": <document>,
"arrayFilters": [ <filterdocument1>, ... ],
"hint": <document|string>
}
}
] )
字段
注意

filter

更新的选择条件。可以使用与 db.collection.find() 方法中相同的查询选择器

update

要执行的更新操作。可以指定以下任一项:

upsert

可选。一个布尔值,表明是否执行更新插入操作。

默认情况下,upsertfalse

arrayFilters

选修的。大量过滤器文档,用于确定针对大量字段的更新操作要修改哪些大量元素。

collation

可选。指定用于操作的排序规则

hint

可选。用于支持更新filter索引。如果指定不存在的索引,则操作出错。

有关详细信息,请参阅 db.collection.updateOne()db.collection.updateMany()

replaceOne 将替换集合中与筛选器匹配的单个文档。如果存在多个匹配的文档,replaceOne 将只替换第一个匹配的文档。

db.collection.bulkWrite([
{ replaceOne :
{
"filter" : <document>,
"replacement" : <document>,
"upsert" : <boolean>,
"collation": <document>,
"hint": <document|string>
}
}
] )
字段
注意

filter

替换操作的选择条件。可以使用与 db.collection.find() 方法中相同的查询选择器

replacement

替换文档。文档不能包含更新操作符

upsert

可选。一个布尔值,表明是否执行更新插入操作。默认情况下,upsertfalse

collation

可选。指定用于操作的排序规则

hint

可选。用于支持更新filter索引。如果指定不存在的索引,则操作出错。

有关详细信息,请参阅 db.collection.replaceOne()

deleteOne 删除集合中与过滤器匹配的单个文档。如果存在多个匹配的文档,deleteOne 将只删除第一个匹配的文档。

db.collection.bulkWrite([
{ deleteOne : {
"filter" : <document>,
"collation" : <document> // Available starting in 3.4
} }
] )

deleteMany 删除集合中与过滤器匹配的所有文档。

db.collection.bulkWrite([
{ deleteMany: {
"filter" : <document>,
"collation" : <document> // Available starting in 3.4
} }
] )
字段
注意

filter

删除操作的选择条件。可以使用与 db.collection.find() 方法中相同的查询选择器

collation

可选。指定用于操作的排序规则

有关详细信息,请参阅 db.collection.deleteOne()db.collection.deleteMany()

如果文档未指定 _id 字段,则 mongod 会添加 _id 字段,并在插入或更新该文档之前为其分配唯一的 ObjectId() 字段。多数驱动程序都会创建一个 ObjectId 并插入 _id 字段,但如果驱动程序或应用程序不这样做,则 mongod 将创建并填充 _id

如果文档包含 _id 字段,则 _id 值在集合中必须是唯一的,以避免重复键错误。

更新或替换操作不能指定与原始文档不同的 _id 值。

参数 ordered 指定 bulkWrite() 是否按顺序执行操作。默认情况下,操作按顺序执行。

如下代码表示包含五个操作的 bulkWrite()

db.collection.bulkWrite(
[
{ insertOne : <document> },
{ updateOne : <document> },
{ updateMany : <document> },
{ replaceOne : <document> },
{ deleteOne : <document> },
{ deleteMany : <document> }
]
)

在默认的 ordered : true状态下,每个操作都会按顺序执行,从第一个操作 insertOne 到最后一个操作 deleteMany

如果将 ordered 设定为 false,则可通过 mongod 对操作重新排序,以提高性能。应用程序不应依赖于操作的执行顺序。

以下代码表示具有六个操作的无序 bulkWrite()

db.collection.bulkWrite(
[
{ insertOne : <document> },
{ updateOne : <document> },
{ updateMany : <document> },
{ replaceOne : <document> },
{ deleteOne : <document> },
{ deleteMany : <document> }
],
{ ordered : false }
)

如果使用 ordered : false,操作结果可能会有所不同。例如,deleteOnedeleteMany可能会删除更多或更少的文档,具体取决于是在insertOneupdateOneupdateManyreplaceOne操作之前还是之后运行。

每组中的操作次数不能超过数据库的maxWriteBatchSize值。 从 MongoDB 3.6 开始,此值为100,000 。 该值会显示在hello.maxWriteBatchSize字段中。

此限制可防止错误消息过大的问题。如果一个群组超过此限制,则客户端驱动程序会将该群组分成计数小于或等于限制值的更小群组。例如,对于 maxWriteBatchSize 值为 100,000 的情况,如果队列由 200,000 个操作组成,则驱动程序将创建 2 个群组,每个群组包含 100,000 个操作。

注意

驱动程序仅在使用高级 API 时才会将群组划分为更小的群组。 如果直接使用db.runCommand() (例如,在编写驱动程序时),MongoDB 在尝试执行超过限制的写入批处理时会引发错误。

从 MongoDB 3.6 开始,一旦单个批处理的错误报告变得太大,MongoDB 会将所有剩余错误消息截断为空字符串。 目前,一旦存在至少 2 条总大小大于1MB的错误消息,就会开始。

大小和分组机制是内部性能细节,在未来版本中可能会有变化。

在分片集合上执行操作的 ordered 列表通常比执行 unordered 列表慢,因为对于有序列表,每个操作都必须等待前一个操作完成。

bulkWrite() 写入操作在固定大小集合上使用时会受到限制。

updateOneupdateMany 将抛出一个 WriteError(如果 update 条件导致正在修改的文档的大小增大)。

replaceOne 如果 replacement 文档大于原始文档,则抛出 WriteError

deleteOnedeleteMany 如果用在固定大小集合上则会抛出 WriteError

bulkWrite() 写入操作在时间序列集合上使用时会受到限制。在时间序列集合上,只能使用 insertOne。所有其他操作都将返回一个 WriteError

db.collection.bulkWrite()出错时会抛出 BulkWriteError 异常。 请参阅 事务中的错误处理。

不包括写关注错误,有序操作在错误后停止,而无序操作继续处理队列中的任何剩余写入操作,除非在事务内运行。 请参阅事务中的错误处理。

写关注错误显示在 writeConcernErrors 字段中,所有其他错误显示在 writeErrors 字段中。如果遇到错误,将显示成功的写入操作的数量,而不是插入的 _id 值。有序操作会显示遇到的单个错误,而无序操作会显示数组中的每个错误。

如果您的集合使用模式验证并将validationAction设立为error ,则插入无效文档或使用无效值更新文档会引发错误。 执行operations大量中无效操作之前的操作并将其写入集合。 ordered字段决定是否执行剩余操作。

db.collection.bulkWrite() 可以在分布式事务中使用。

重要

在大多数情况下,与单文档写入操作相比,分布式事务会产生更高的性能成本,并且分布式事务的可用性不应取代有效的模式设计。在许多情况下,非规范化数据模型(嵌入式文档和数组)仍然是数据和使用案例的最佳选择。换言之,对于许多场景,适当的数据建模将最大限度地减少对分布式事务的需求。

有关其他事务使用注意事项(如运行时间限制和 oplog 大小限制),另请参阅生产注意事项

对于特征兼容性版本 (fcv) "4.4" 及更高版本,如果在事务中对不存在的集合执行了插入操作或更新操作 (upsert: true),则会隐式创建该集合。

注意

您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。

如果是在事务中运行,则请勿显式设置此操作的写关注。要将写关注与事务一起使用,请参阅事务和写关注。

从 MongoDB 4.2 开始,如果 db.collection.bulkWrite() 操作在事务内部遇到错误,则该方法会抛出 BulkWriteException(与在事务外部相同)。

在 4.0 中,如果 bulkWrite 操作在事务内遇到错误,则引发的错误不会包装为 BulkWriteException

在事务内部,批量写入中的第一个错误会导致整个批量写入失败并中止事务,即使批量写入是无序的也是如此。

了解 bulkWrite() 操作排序和错误处理非常重要。默认情况下,bulkWrite() 运行一个有序的操作列表:

  • 操作以串行方式执行。

  • 如果某个操作出错,将不会执行此操作以及任何后续操作。

  • 在错误操作完成之前列出的操作。

bulkWrite() 示例使用了 pizzas 集合:

db.pizzas.insertMany( [
{ _id: 0, type: "pepperoni", size: "small", price: 4 },
{ _id: 1, type: "cheese", size: "medium", price: 7 },
{ _id: 2, type: "vegan", size: "large", price: 8 }
] )

下面的 bulkWrite() 示例在 pizzas 集合上执行这些操作:

  • 使用 insertOne 添加两个文档。

  • 使用 updateOne 更新一个文档。

  • 使用 deleteOne 删除文档。

  • 使用 replaceOne 替换一个文档。

try {
db.pizzas.bulkWrite( [
{ insertOne: { document: { _id: 3, type: "beef", size: "medium", price: 6 } } },
{ insertOne: { document: { _id: 4, type: "sausage", size: "large", price: 10 } } },
{ updateOne: {
filter: { type: "cheese" },
update: { $set: { price: 8 } }
} },
{ deleteOne: { filter: { type: "pepperoni"} } },
{ replaceOne: {
filter: { type: "vegan" },
replacement: { type: "tofu", size: "small", price: 4 }
} }
] )
} catch( error ) {
print( error )
}

输出示例,包括已完成操作的摘要:

{
acknowledged: true,
insertedCount: 2,
insertedIds: { '0': 3, '1': 4 },
matchedCount: 2,
modifiedCount: 2,
deletedCount: 1,
upsertedCount: 0,
upsertedIds: {}
}

在运行上一个 bulkWrite() 示例之前,如果此集合已经包含一个 _id4 的文档,将为第二个 insertOne 操作返回如下重复键异常:

writeErrors: [
WriteError {
err: {
index: 1,
code: 11000,
errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 4 }',
op: { _id: 4, type: 'sausage', size: 'large', price: 10 }
}
}
],
result: BulkWriteResult {
result: {
ok: 1,
writeErrors: [
WriteError {
err: {
index: 1,
code: 11000,
errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 4 }',
op: { _id: 4, type: 'sausage', size: 'large', price: 10 }
}
}
],
writeConcernErrors: [],
insertedIds: [ { index: 0, _id: 3 }, { index: 1, _id: 4 } ],
nInserted: 1,
nUpserted: 0,
nMatched: 0,
nModified: 0,
nRemoved: 0,
upserted: []
}
}

由于 bulkWrite() 示例是有序的,因此仅完成了第一个 insertOne 操作。

要完成所有没有错误的操作,请运行 bulkWrite(),并将 ordered 设置为 false。有关示例,请参阅以下部分。

要指定无序 bulkWrite(),请将 ordered 设置为 false

在一个无序的 bulkWrite() 操作列表中:

  • 操作可以并行运行(不保证)。有关详细信息,请参阅有序操作和无序操作

  • 出现错误的操作未完成。

  • 所有无错误的操作都已完成。

继续 pizzas 集合示例,删除并重新创建此集合:

db.pizzas.insertMany( [
{ _id: 0, type: "pepperoni", size: "small", price: 4 },
{ _id: 1, type: "cheese", size: "medium", price: 7 },
{ _id: 2, type: "vegan", size: "large", price: 8 }
] )

在以下示例中:

  • bulkWrite()pizzas 集合运行无序操作。

  • 第二个 insertOne 操作具有与第一个 insertOne 相同的 _id,这样会产生一个重复键错误。

try {
db.pizzas.bulkWrite( [
{ insertOne: { document: { _id: 3, type: "beef", size: "medium", price: 6 } } },
{ insertOne: { document: { _id: 3, type: "sausage", size: "large", price: 10 } } },
{ updateOne: {
filter: { type: "cheese" },
update: { $set: { price: 8 } }
} },
{ deleteOne: { filter: { type: "pepperoni"} } },
{ replaceOne: {
filter: { type: "vegan" },
replacement: { type: "tofu", size: "small", price: 4 }
} }
],
{ ordered: false } )
} catch( error ) {
print( error )
}

输出示例,包括重复键错误和已完成操作的摘要:

writeErrors: [
WriteError {
err: {
index: 1,
code: 11000,
errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 3 }',
op: { _id: 3, type: 'sausage', size: 'large', price: 10 }
}
}
],
result: BulkWriteResult {
result: {
ok: 1,
writeErrors: [
WriteError {
err: {
index: 1,
code: 11000,
errmsg: 'E11000 duplicate key error collection: test.pizzas index: _id_ dup key: { _id: 3 }',
op: { _id: 3, type: 'sausage', size: 'large', price: 10 }
}
}
],
writeConcernErrors: [],
insertedIds: [ { index: 0, _id: 3 }, { index: 1, _id: 3 } ],
nInserted: 1,
nUpserted: 0,
nMatched: 2,
nModified: 2,
nRemoved: 1,
upserted: []
}
}

由于重复键错误,第二个 insertOne 操作失败。在无序 bulkWrite() 中,任何没有错误的操作都已完成。

继续 pizzas 集合示例,删除并重新创建此集合:

db.pizzas.insertMany( [
{ _id: 0, type: "pepperoni", size: "small", price: 4 },
{ _id: 1, type: "cheese", size: "medium", price: 7 },
{ _id: 2, type: "vegan", size: "large", price: 8 }
] )

以下 bulkWrite() 示例对 pizzas 集合运行操作,并设置 "majority" 写关注和 100 毫秒超时

try {
db.pizzas.bulkWrite( [
{ updateMany: {
filter: { size: "medium" },
update: { $inc: { price: 0.1 } }
} },
{ updateMany: {
filter: { size: "small" },
update: { $inc: { price: -0.25 } }
} },
{ deleteMany: { filter: { size: "large" } } },
{ insertOne: {
document: { _id: 4, type: "sausage", size: "small", price: 12 }
} } ],
{ writeConcern: { w: "majority", wtimeout: 100 } }
)
} catch( error ) {
print( error )
}

如果大多数副本集成员确认操作的时间超过 wtimeout,则该示例返回写关注错误和已完成操作的摘要:

result: BulkWriteResult {
result: {
ok: 1,
writeErrors: [],
writeConcernErrors: [
WriteConcernError {
err: {
code: 64,
codeName: 'WriteConcernFailed',
errmsg: 'waiting for replication timed out',
errInfo: { wtimeout: true, writeConcern: [Object] }
}
}
],
insertedIds: [ { index: 3, _id: 4 } ],
nInserted: 0,
nUpserted: 0,
nMatched: 2,
nModified: 2,
nRemoved: 0,
upserted: [],
opTime: { ts: Timestamp({ t: 1660329086, i: 2 }), t: Long("1") }
}
}