Docs 菜单
Docs 主页
/
MongoDB Manual
/ / /

db.collection.watch()

在此页面上

  • 定义
  • 可用性
  • 部署
  • 引擎加密
  • 读关注 majority 支持
  • 行为
  • 可恢复性
  • 更新操作的完整文件查询
  • 访问控制
  • 游标迭代
  • 示例
  • 打开变更流
  • 通过完整文档更新查找 Change Stream
  • 附带文档前映像和后映像的变更流
  • 使用 Aggregation Pipeline 筛选器的 Change Stream
  • 恢复 Change Stream
db.collection.watch( pipeline, options )

重要

mongosh 方法

本页面提供 mongosh 方法的相关信息。这不是数据库命令或特定语言驱动程序(例如 Node.js)的相关文档。

有关数据库命令,请参阅附带 $changeStream 聚合阶段的 aggregate 命令。

如需了解 MongoDB API 驱动程序,请参阅特定语言的 MongoDB 驱动程序文档。

仅适用于副本集和分片集群

打开针对某一集合的变更流游标

Parameter
类型
说明
pipeline
阵列

可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:

指定用于筛选/修改变更事件输出的管道。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

options
文档
可选。用于修改 watch() 行为的其他选项。

options 文档可包含以下字段和值:

字段
类型
说明
resumeAfter
文档

可选。指示 watch() 尝试在恢复令牌中指定的操作之后开始恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

resumeAfterstartAfterstartAtOperationTime 互斥。

startAfter
文档

可选。指示 watch() 在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

startAfterresumeAfterstartAtOperationTime 互斥。

fullDocument
字符串

可选。默认情况下,watch() 返回通过更新操作修改的字段的增量,而不是整个更新后的文档。

fullDocument 设置为 "updateLookup",以指示 watch() 查找更新后的文档的最新多数提交版本。watch() 除了返回 updateDescription 增量以外,还会返回一个包含文档查询的 fullDocument 字段。

从 MongoDB 6.0 开始,您可将 fullDocument 设为:

  • "whenAvailable" 在插入、替换或更新文档之后输出文档后像(如果有)。

  • "required" 在插入、替换或更新文档之后输出文档后像。如果后像不可用,则引发错误。

fullDocumentBeforeChange
字符串

可选。

从 MongoDB 6.0 开始,您可以使用新的 fullDocumentBeforeChange 字段并将其设置为:

  • "whenAvailable" 在替换、更新或删除文档之前输出文档前像(如果有)。

  • "required" 在替换、更新或删除文档之前输出文档前像。如果前像不可用,则引发错误。

  • "off" 抑制文档前像。"off" 是默认值。

batchSize
int

可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。

功能与 cursor.batchSize() 相同。

maxAwaitTimeMS
int

可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000 毫秒。

collation
文档

可选。传递排序规则文档以指定变更流游标的排序规则。

如果省略,则默认为 simple 二进制比较。

showExpandedEvents
布尔

可选。从 MongoDB 6.0 开始,change stream 支持 DDL 事件的变更通知,如 createIndexesdropIndexes 事件。要在 change stream 中包含扩展事件,请使用showExpandedEvents选项创建 change stream 游标。

6.0 版本中的新功能

startAtOperationTime
时间戳

可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅 rs.printReplicationInfo()

startAtOperationTimeresumeAfterstartAfter 互斥。

返回:只要与 MongoDB 部署的连接保持打开集合存在,游标便会保持打开状态。请参阅变更事件,获取变更事件文档的示例。

提示

另请参阅:

db.collection.watch() 可用于副本集和分片集群部署:

只能将 db.collection.watch()Wired Tiger 存储引擎一起使用。

无论是否支持"majority" 读关注(read concern), 变更流 都可用;也就是说,可以启用(默认)或majority 禁用 读关注(read concern) 支持,以使用变更流。

  • db.collection.watch() 仅通知持续到大多数数据承载节点的数据更改。

  • 变更流游标保持打开状态,直到出现以下任一情况:

    • 游标已明确关闭。

    • 发生失效事件;例如删除或重命名集合。

    • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

    • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。

与 MongoDB 驱动程序不同,mongosh 不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次

db.collection.watch() 使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfterstartAfter 选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.collection.watch() 无法恢复变更流。

有关恢复变更流的更多信息,请参阅恢复变更流

注意

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。

注意

在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。

根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。

使用访问控制来运行时,用户必须对集合资源具有 findchangeStream 特权动作。换言之,用户必须具有能授予以下特权角色

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }

内置 read 角色提供了相应的特权。

MongoDB 提供多种在游标上迭代的方法。

cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

以下操作针对 data.sensors 集合打开 change stream 游标:

watchCursor = db.getSiblingDB("data").sensors.watch()

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

如需变更流输出的完整文档,请参阅变更事件。

注意

不能将 isExhausted()变更流结合使用。

fullDocument 选项设置为 "updateLookup",从而指示变更流游标查找与更新变更流事件关联的文档的最新多数提交版本。

以下操作使用fullDocument : "updateLookup"选项打开针对data.sensors集合的 change stream 游标。

watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

对于任何更新操作,变更事件都会在 fullDocument 字段中返回文档查找的结果。

有关完整文档更新输出的示例,请参阅变更流更新事件

如需变更流输出的完整文档,请参阅变更事件。

从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):

如果图像属于以下情况,则前像和后像不可用于变更流事件

  • 在文档更新或删除操作时未对集合启用。

  • expireAfterSeconds 中设置的前像和后像保留时间后之后被删除。

    • 以下示例将整个集群上的 expireAfterSeconds 设置为 100 秒:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • 以下示例返回当前的 changeStreamOptions 设置,包括 expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSeconds 设置为 off 可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。

    • 如果变更流事件从 oplog 中删除,则无论 expireAfterSeconds 前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。

其他考量:

  • 启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。

  • 将变更流事件大小限制为小于 16 MB。要限制事件大小,您可以:

    • 将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如 updateDescription)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。

    • 如果其他变更流事件字段(例如 updateDescription)并不大,则仅请求变更流输出中最多 16 MB 的文档的后像。

    • 在以下情况下,仅请求最大 16 MB 的文档的 change stream 输出中的预映像:

      • 文档更新仅影响文档结构或内容的一小部分,

      • 不会引起 replace 变更事件。replace 事件始终包含后像。

  • 如要请求前图像,请在 db.collection.watch() 中将 fullDocumentBeforeChange 设置为 requiredwhenAvailable。如要请求后图像,请使用相同方法设置 fullDocument

  • 前像被写入 config.system.preimages 集合。

    • config.system.preimages 集合可能会变大。要限制集合大小,可如前文所示为前映像设置 expireAfterSeconds 时间。

    • 前像由后台进程异步删除。

重要

向后不兼容的功能

从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod 命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。

提示

另请参阅:

创建启用 changeStreamPreAndPostImagestemperatureSensor 集合:

db.createCollection(
"temperatureSensor",
{ changeStreamPreAndPostImages: { enabled: true } }
)

用温度读数填充 temperatureSensor 集合:

db.temperatureSensor.insertMany( [
{ "_id" : 0, "reading" : 26.1 },
{ "_id" : 1, "reading" : 25.9 },
{ "_id" : 2, "reading" : 24.3 },
{ "_id" : 3, "reading" : 22.4 },
{ "_id" : 4, "reading" : 24.6 }
] )

以下部分显示了使用 temperatureSensor 集合的文档前映像和后映像的 change stream 示例。

使用 fullDocumentBeforeChange: "whenAvailable" 设置输出文档前像(如有)。前像是指被替换、更新或删除之前的文档。插入的文档没有前像。

以下示例使用 fullDocumentBeforeChange: "whenAvailable"temperatureSensor 集合创建变更流游标:

watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch(
[],
{ fullDocumentBeforeChange: "whenAvailable" }
)

以下示例使用游标检查新的变更流事件:

while ( !watchCursorFullDocumentBeforeChange.isClosed() ) {
if ( watchCursorFullDocumentBeforeChange.hasNext() ) {
printjson( watchCursorFullDocumentBeforeChange.next() );
}
}

在示例中:

  • while 循环将一直运行,直到游标关闭。

  • 如果该游标包含文档,则 hasNext() 返回 true

以下示例更新temperatureSensor文档的reading字段:

db.temperatureSensor.updateOne(
{ _id: 2 },
{ $set: { reading: 22.1 } }
)

更新 temperatureSensor 文档后,变更事件会在 fullDocumentBeforeChange 字段中输出文档前像。前像包含更新之前的 temperatureSensor文档reading 字段。例如:

{
"_id" : {
"_data" : "82624B21...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1649090957, 1),
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 2
},
"updateDescription" : {
"updatedFields" : {
"reading" : 22.1
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
},
"fullDocumentBeforeChange" : {
"_id" : 2,
"reading" : 24.3
}
}

提示

另请参阅:

使用 fullDocument: "whenAvailable" 设置输出文档后像(如果可用)。后像是指插入、替换或更新后的文档。已删除的文档没有后像。

以下示例使用 fullDocument: "whenAvailable"temperatureSensor 集合创建变更流游标:

watchCursorFullDocument = db.temperatureSensor.watch(
[],
{ fullDocument: "whenAvailable" }
)

以下示例使用游标检查新的变更流事件:

while ( !watchCursorFullDocument.isClosed() ) {
if ( watchCursorFullDocument.hasNext() ) {
printjson( watchCursorFullDocument.next() );
}
}

在示例中:

  • while 循环将一直运行,直到游标关闭。

  • 如果该游标包含文档,则 hasNext() 返回 true

以下示例更新temperatureSensor文档的reading字段:

db.temperatureSensor.updateOne(
{ _id: 1 },
{ $set: { reading: 29.5 } }
)

更新 temperatureSensor 文档后,变更事件会在 fullDocument 字段中输出文档后图像。后图像包含更新之后的 temperatureSensor 文档reading字段。例如:

{
"_id" : {
"_data" : "8262474D...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1648840090, 1),
"fullDocument" : {
"_id" : 1,
"reading" : 29.5
},
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 1
},
"updateDescription" : {
"updatedFields" : {
"reading" : 29.5
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
}
}

提示

另请参阅:

注意

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

以下操作使用聚合管道打开针对 data.sensors 集合的变更流游标,从而仅筛选 insert 事件:

watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()){
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}

变更流游标仅返回 operationTypeinsert 的变更事件。如需变更流输出的完整文档,请参阅变更事件

变更流游标返回的每份文档均包含一个恢复令牌作为 _id 字段。要恢复变更流,请将要恢复的变更事件的整个 _id 文档传递给 watch()resumeAfterstartAfter 选项。

以下操作使用恢复令牌恢复针对 data.sensors 集合的变更流游标。这个示例假设生成恢复令牌的操作尚未从集群的 oplog 中删除。

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
watchCursor.close();
let resumeToken = firstChange._id;
resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!resumedWatchCursor.isClosed()){
if (resumedWatchCursor.hasNext()){
print(resumedWatchCursor.next());
}
}

请参阅恢复变更流,获取恢复变更流的完整文档。

后退

db.collection.validate()