Docs 菜单
Docs 主页
/ / /
Ruby MongoDB 驱动程序
/

Change Streams

在此页面上

  • 监视collection上的更改
  • 监视数据库更改
  • 观察集群上的变更
  • 关闭change stream
  • 恢复 Change Stream

从 MongoDB Server 3.6 版本开始,聚合框架支持新的 $changeStream管道阶段。首先在聚合管道中指定此阶段,允许用户请求为特定collection的所有更改发送通知。从 MongoDB 4.0 开始,除了collection之外,数据库和集群还支持change stream。

Ruby 驱动程序提供了一个 API,用于使用这个新的管道阶段接收特定collection、数据库或集群变更的通知。虽然您可以直接使用管道操作符和聚合框架创建变更流,但建议使用下述驱动程序 API,因为如果出现超时、网络错误、服务器错误(表明以下情况),驱动程序会恢复变更流一次:正在进行故障转移或出现其他类型的可恢复错误。

服务器上的change stream需要"majority"读关注(read concern)或无读关注(read concern)。

由于 此处 记录的问题,变更流无法与 JRuby 一起正常工作 。也就是说,JRuby 会在后台绿色线程中急切地对枚举器上的#next求值,因此在变更流上调用#next将导致在后台循环中调用 getMores。

集合变更流是通过在集合上调用#watch方法来创建的:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

您还可以在通知可用时接收通知:

stream = collection.watch
enum = stream.to_enum
while doc = enum.next
process(doc)
end

next方法会阻止并轮询集群,直到有更改可用。 使用try_next方法以无阻塞方式迭代change stream;此方法将等待来自server的更改长达 max_await_time_ms 毫秒,如果没有收到更改,将返回 nil。如果出现不可恢复的错误, nexttry_next都会引发异常。 有关无限期地从collection中读取变更的示例,请参阅下面的“恢复change stream”部分。

change stream可以采用聚合框架管道操作符格式的筛选器:

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
{'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
])
enum = stream.to_enum
while doc = enum.next
process(doc)
end

数据库变更流会通知数据库内任何集合的变更以及数据库范围的事件,例如删除数据库。

change stream 是通过对数据库对象调用#watch方法来创建的:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

集群change stream会通知集群内任何collection、任何数据库的变更以及集群范围的事件。

change stream是通过在对象(而不是集群)上调用#watch方法来创建的:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

您可以通过调用变更流的#close方法来关闭变更流:

stream.close

change stream由两类操作组成:初始聚合和接收下一批处理的getMore请求。

出现网络错误以及服务器返回表明其状态已更改(例如,不再是主节点)的错误时,驱动程序将自动重试每个getMore操作。 驱动程序不会重试初始聚合。

实际上,这意味着:

  • 如果集群没有足够的可用节点来满足"majority"读取偏好(read preference),则调用collection.watch将失败。

  • 一旦collection.watch成功返回,如果集群随后经历选举或丢失节点,但恢复速度足够快,则通过nexteach方法进行的change stream读取将继续对应用程序透明。

为了无限期且可靠地监视变更,而不会丢失任何更改或多次处理更改,应用程序必须跟踪变更流的恢复令牌,并在遇到导致驱动程序自动恢复失败的扩展错误条件时重新启动变更流。 。 以下代码片段展示了无限迭代变更流、使用resume_token变更流方法检索恢复令牌以及在所有 MongoDB 或网络错误时使用:resume_after选项重新启动变更流的示例:

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
while doc = enum.next
process(doc)
token = stream.resume_token
end
rescue Mongo::Error
sleep 1
end
end

上述迭代在enum.next调用处阻塞,并且不允许在运行此代码的 Ruby 进程终止时恢复处理。驱动程序还提供了try_next方法,当change stream中没有更改时,该方法会返回nil (在一小段等待时间后),而不是无限期地阻塞。使用try_next方法,即使特定请求未返回任何更改,恢复令牌也可以在每个getMore请求后保留,从而恢复令牌保留在 oplog 的顶部,并且应用程序有机会如果处理更改的进程终止,则将其持久化:

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
doc = enum.try_next
if doc
process(doc)
end
token = stream.resume_token
# Persist +token+ to support resuming processing upon process restart
rescue Mongo::Error
sleep 1
end
end

请注意,每次try_next调用后都应从change stream中检索恢复令牌,即使该调用未返回任何文档。

_id每个change stream文档的字段中还提供了恢复令牌。不建议读取_id字段,因为它可能会被应用程序投影出来,而且当getMore未返回文档时,仅使用_id字段不会推进恢复令牌。

后退

GridFS

来年

会话