Change Streams
从 MongoDB Server 3.6 版本开始,聚合框架支持新的 $changeStream
管道阶段。首先在聚合管道中指定此阶段,允许用户请求为特定collection的所有更改发送通知。从 MongoDB 4.0 开始,除了collection之外,数据库和集群还支持change stream。
Ruby 驱动程序提供了一个 API,用于使用这个新的管道阶段接收特定collection、数据库或集群变更的通知。虽然您可以直接使用管道操作符和聚合框架创建变更流,但建议使用下述驱动程序 API,因为如果出现超时、网络错误、服务器错误(表明以下情况),驱动程序会恢复变更流一次:正在进行故障转移或出现其他类型的可恢复错误。
服务器上的change stream需要"majority"
读关注(read concern)或无读关注(read concern)。
由于 此处 记录的问题,变更流无法与 JRuby 一起正常工作 。也就是说,JRuby 会在后台绿色线程中急切地对枚举器上的#next
求值,因此在变更流上调用#next
将导致在后台循环中调用 getMores。
监视collection上的更改
集合变更流是通过在集合上调用#watch
方法来创建的:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') collection = client[:test] stream = collection.watch collection.insert_one(a: 1) doc = stream.to_enum.next process(doc)
您还可以在通知可用时接收通知:
stream = collection.watch enum = stream.to_enum while doc = enum.next process(doc) end
next
方法会阻止并轮询集群,直到有更改可用。 使用try_next
方法以无阻塞方式迭代change stream;此方法将等待来自server的更改长达 max_await_time_ms 毫秒,如果没有收到更改,将返回 nil。如果出现不可恢复的错误, next
和try_next
都会引发异常。 有关无限期地从collection中读取变更的示例,请参阅下面的“恢复change stream”部分。
change stream可以采用聚合框架管道操作符格式的筛选器:
stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } ]) enum = stream.to_enum while doc = enum.next process(doc) end
监视数据库更改
数据库变更流会通知数据库内任何集合的变更以及数据库范围的事件,例如删除数据库。
change stream 是通过对数据库对象调用#watch
方法来创建的:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') database = client.database stream = database.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
观察集群上的变更
集群change stream会通知集群内任何collection、任何数据库的变更以及集群范围的事件。
change stream是通过在对象(而不是集群)上调用#watch
方法来创建的:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') stream = client.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
关闭change stream
您可以通过调用变更流的#close
方法来关闭变更流:
stream.close
恢复 Change Stream
change stream由两类操作组成:初始聚合和接收下一批处理的getMore
请求。
出现网络错误以及服务器返回表明其状态已更改(例如,不再是主节点)的错误时,驱动程序将自动重试每个getMore
操作。 驱动程序不会重试初始聚合。
实际上,这意味着:
如果集群没有足够的可用节点来满足
"majority"
读取偏好(read preference),则调用collection.watch
将失败。一旦
collection.watch
成功返回,如果集群随后经历选举或丢失节点,但恢复速度足够快,则通过next
或each
方法进行的change stream读取将继续对应用程序透明。
为了无限期且可靠地监视变更,而不会丢失任何更改或多次处理更改,应用程序必须跟踪变更流的恢复令牌,并在遇到导致驱动程序自动恢复失败的扩展错误条件时重新启动变更流。 。 以下代码片段展示了无限迭代变更流、使用resume_token
变更流方法检索恢复令牌以及在所有 MongoDB 或网络错误时使用:resume_after
选项重新启动变更流的示例:
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum while doc = enum.next process(doc) token = stream.resume_token end rescue Mongo::Error sleep 1 end end
上述迭代在enum.next
调用处阻塞,并且不允许在运行此代码的 Ruby 进程终止时恢复处理。驱动程序还提供了try_next
方法,当change stream中没有更改时,该方法会返回nil
(在一小段等待时间后),而不是无限期地阻塞。使用try_next
方法,即使特定请求未返回任何更改,恢复令牌也可以在每个getMore
请求后保留,从而恢复令牌保留在 oplog 的顶部,并且应用程序有机会如果处理更改的进程终止,则将其持久化:
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum doc = enum.try_next if doc process(doc) end token = stream.resume_token # Persist +token+ to support resuming processing upon process restart rescue Mongo::Error sleep 1 end end
请注意,每次try_next
调用后都应从change stream中检索恢复令牌,即使该调用未返回任何文档。
_id
每个change stream文档的字段中还提供了恢复令牌。不建议读取_id
字段,因为它可能会被应用程序投影出来,而且当getMore
未返回文档时,仅使用_id
字段不会推进恢复令牌。