db.watch()
定义
db.watch( pipeline, options )
仅适用于副本集和分片集群
打开数据库的变更流游标,以报告其所有非
system
集合的信息。Parameter类型说明pipeline
阵列可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:
指定用于筛选/修改变更事件输出的管道。
options
文档可选。 用于修改db.watch()
行为的其他选项。options
文档可包含以下字段和值:字段类型说明resumeAfter
文档可选。指定一个恢复令牌作为变更流的逻辑起点。无法用于在
invalidate
事件之后恢复变更流。resumeAfter
与startAfter
和startAtOperationTime
互斥。startAfter
文档可选。指定一个恢复令牌作为变更流的逻辑起点。与
resumeAfter
不同,startAfter
可在出现invalidate
事件之后通过创建新的变更流来恢复通知。startAfter
与resumeAfter
和startAtOperationTime
互斥。fullDocument
字符串可选。默认情况下,
db.watch()
返回通过更新操作修改的字段的增量,而不是整个更新后的文档。将
fullDocument
设置为"updateLookup"
,以指示db.watch()
查找更新后的文档的最新多数提交版本。db.watch()
除了返回updateDescription
增量以外,还会返回一个包含文档查询的fullDocument
字段。从 MongoDB 6.0 开始,您可将
fullDocument
设为:"whenAvailable"
在插入、替换或更新文档之后输出文档后像(如果有)。"required"
在插入、替换或更新文档之后输出文档后像。如果后像不可用,则引发错误。
fullDocumentBeforeChange
字符串可选。默认值为
"off"
。从 MongoDB 6.0 开始,您可以使用新的
fullDocumentBeforeChange
字段并将其设置为:"whenAvailable"
在替换、更新或删除文档之前输出文档前像(如果有)。"required"
在替换、更新或删除文档之前输出文档前像。如果前像不可用,则引发错误。"off"
抑制文档前像。"off"
是默认值。
batchSize
int可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。
功能与
cursor.batchSize()
相同。maxAwaitTimeMS
int可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。
默认值为
1000
毫秒。collation
文档可选。传递排序规则文档以指定变更流游标的排序规则。
如果省略,则默认为
simple
二进制比较。startAtOperationTime
时间戳可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅
rs.printReplicationInfo()
。startAtOperationTime
与resumeAfter
和startAfter
互斥。返回: 游标位于更改事件文档上。有关更改事件文档的示例,请参阅更改事件。
可用性
部署
db.watch()
适用于副本集和分片集群:
对于副本集,您可在任何承载数据的节点上发出
db.watch()
。对于分片集群,您必须在一个
mongos
实例上发出db.watch()
。
引擎加密
只能将 db.watch()
与 Wired Tiger 存储引擎一起使用。
读关注majority
支持
从 MongoDB 4.2 开始,无论是否有 "majority"
读关注支持,change stream 都可用;也就是说,读关注 majority
支持可以启用(默认),也可以禁用以使用 change stream。
在 MongoDB 4.0 及更早版本中,仅当启用 "majority"
读关注支持时,才能使用变更流。
行为
您不能在
admin
、local
或config
数据库中运行db.watch()
。db.watch()
仅通知持续到大多数数据承载节点的数据更改。变更流游标保持打开状态,直到出现以下任一情况:
您可以为不存在的数据库运行
db.watch()
。但是,一旦创建了数据库并且删除了该数据库,变更流游标就会关闭。
可恢复性
与 MongoDB 驱动程序不同,mongosh
不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次。
db.watch()
使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfter
或 startAfter
选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.watch()
无法恢复变更流。
有关恢复变更流的更多信息,请参阅恢复变更流。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
更新操作的完整文件查询
默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。
根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。
访问控制
使用访问控制来运行时,用户必须对数据库资源具有 find
和 changeStream
特权动作。换言之,用户必须具有能授予以下特权的角色:
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream"] }
内置 read
角色提供了相应的特权。
游标迭代
MongoDB 提供多种在游标上迭代的方法。
cursor.hasNext()
方法会阻塞并等待下一个事件。要监控 watchCursor
游标并迭代事件,请使用 hasNext()
,如下所示:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
该 cursor.tryNext()
方法为非阻塞式。要监控 watchCursor
游标并迭代事件,请使用 tryNext()
,如下所示:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例子
mongosh
中的以下操作在 hr
数据库中打开变更流游标。返回的游标报告该数据库中所有非 system
集合的数据变更。
watchCursor = db.getSiblingDB("hr").watch()
迭代游标以检查是否存在新事件。使用 cursor.isClosed()
方法和 cursor.tryNext()
方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
如需变更流输出的完整文档,请参阅变更事件。
注意
不能将 isExhausted()
与变更流结合使用。