类:Mongo::Collection::View::ChangeStream

继承:
聚合(Aggregation) 显示全部
包括:
可重试
定义于:
build/Ruby-driver-v 2.19 /lib/mongo/collection/view/change_stream.rb ,
build/Ruby-driver-v 2.19 /lib/mongo/collection/view/change_stream/retryable.rb

Overview

注意:

仅适用于服务器版本3.6及更高版本。

注意:

由于此处记录的问题,ChangeStreams 无法与 JRuby 一起正常工作: github.com/jRuby/jRuby/issues/ 4212 。 也就是说,JRuby 会在背景绿色线程中急切地对枚举器求值 #next,因此在变更流上调用 #next 将导致在背景循环调用 getMores。

提供围绕聚合框架中“$changeStream”管道阶段的行为。 通过指定此阶段,用户可以请求就特定集合或数据库的所有更改发送通知。

由于:

  • 2.5.0

在命名空间下定义

模块: 可重试

常量摘要折叠

FULL_DOCUMENT_DEFAULT =

返回 fullDocument 选项的默认值。

返回:

  • ( string )

    fullDocument 选项默认值。

由于:

  • 2.5.0

' default '.冻结
DATABASE =

用于指示变更流应侦听整个数据库而不仅仅是集合的更改。

返回:

  • (符号)

    用于指示变更流应侦听整个数据库而不仅仅是集合的更改。

由于:

  • 2.6.0

:database
集群 =

用于指示变更流应侦听整个集群而不仅仅是集合上的变更。

返回:

  • (符号)

    用于指示变更流应侦听整个集群而不仅仅是集合上的变更。

由于:

  • 2.6.0

:cluster

聚合继承的常量

Aggregation::REROUTE

Loggable中包含的常量

Loggable::PREFIX

可解释性中包含的常量

explainable::ALL_PLANS_EXECUTION 、explainable ::EXECUTION_STATS 、explainable ::QUERY_PLANNER

实例属性摘要折叠

Aggregation继承的属性

#view

Iterable包含的属性

#cursor

实例方法摘要折叠

Aggregation继承的方法

#allow_disk_use#explain#write?

Retryable 中包含的方法

#read_worker#select_server#write_worker

Loggable中包含的方法

#log_debug#log_error#log_ Fatal#log_info#log_warn#logger

方法包含在可解释

#explain

Iterable包含的方法

#close_query

构造函数详情

#initialize (view, 管道,changes_for, options = {}) ⇒ ChangeStream

为提供的集合视图、管道和选项初始化变更流。

例子:

创建新的变更流视图。

ChangeStream.new(view, pipeline, options)

参数:

  • 查看 ( Collection::View )

    集合视图。

  • 管道 ( Array<Hash> )

    用于过滤变更通知的操作符管道。

  • 选项 哈希 (默认为: {}

    变更流选项。

选项哈希 ( options ):

  • :full_document string

    允许的值:nil、'default'、'updateLookup'、'whenAvailable'、'required'。

    默认为不发送值(即 nil),相当于“default”。 默认情况下,部分更新的变更通知将包含描述文档变更的增量。

    当设置为“updateLookup”时,部分更新的变更通知将包括描述文档更改的增量,以及自更改发生后某个时间以来已更改的整个文档的副本。

    当设置为 'whenAvailable' 时,将变更流配置为返回替换和更新变更事件的已修改文档的后像(如果此事件的后像可用)。

    当设置为“必需”时,行为与“whenAvailable”相同,只是如果后图像不可用,则会引发错误。

  • :full_document_before_change string

    允许的值:nil、'whenAvailable'、'required'、'off'。

    默认为不发送值(即 nil),相当于“off”。

    当设置为 'whenAvailable' 时,将变更流配置为返回用于替换、更新和删除变更事件的已修改文档的前像(如果可用)。

    当设置为“必需”时,行为与“whenAvailable”相同,只是如果前像不可用,则会引发错误。

  • :resume_after BSON::DocumentHash

    指定新变更流的逻辑起点。

  • :max_await_time_ms 整数

    服务器等待新文档满足变更流查询的最长时间。

  • :batch_size 整数

    批次中要返回的文档数量。

  • :collation BSON::DocumentHash

    要使用的排序规则。

  • :start_at_operation_time ( BSON::Timestamp )

    仅返回在指定时间戳时或之后发生的更改。 针对服务器运行的任何命令都将返回可在此处使用的集群时间。 仅由服务器版本4.0 + 识别。

  • :start_after Bson::DocumentHash

    与 :resume_after 类似,此选项接受恢复令牌并启动新的变更流,返回令牌后的第一个通知。 这将允许用户观看已删除并重新创建的集合或新重命名的集合,而不会错过任何通知。

  • :comment 对象

    用户提供的待附加到该命令的注释。

  • :show_expanded_events 布尔值

    使服务器能够发送变更流事件的“扩展”列表。 此标志集包含的其他事件列表包括:createIndexes、dropIndexes、modify、create、shardCollection、reshardCollection、refinedCollectionShardKey。

    如果同时指定了“startAfter”和“resumeAfter”,服务器将报告错误。

由于:

  • 2.5.0



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第127行

def 初始化(查看, 管道, changes_for, 选项 = {})
  @view = 查看
  @changes_for = changes_for
  @change_stream_filters = 管道 && 管道.dup
  @options = 选项 && 选项.dup.冻结
  @start_after = @options[:start_after]

  # 变更流跟踪的恢复令牌,仅使用
  # 当没有游标或没有游标恢复令牌时
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # 恢复变更流时发送不同的参数
  # 与发送第一个查询时相比
  @resuming = true
end

实例属性详细信息

# options =" BSON::Document" (只读)

返回变更流选项。

返回:

  • ( BSON::Document )

    变更流选项。

由于:

  • 2.5.0



61
62
63
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第61行

def 选项
  @options
end

实例方法详细信息

# closenil

注意:

此方法尝试关闭变更流使用的游标,从而关闭服务器端变更流游标。 此方法忽略关闭服务器端游标时发生的任何错误。

关闭变更流。

例子:

关闭变更流。

stream.close

返回:

  • ( nil )

    始终为零。

由于:

  • 2.5.0



237
238
239
240
241
242
243
244
245
246
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第237行

def 关闭
  除非 已关闭?
    开始
      @cursor.关闭
    救援 错误::OperationFailure, 错误::SocketError, 错误::SocketTimeoutError, 错误::MissingConnection
      # 忽略
    end
    @cursor = nil
  end
end

#已关闭?true , false

变更流是否已关闭?

例子:

确定变更流是否已关闭。

stream.closed?

返回:

  • ( true , false )

    如果变更流已关闭。

由于:

  • 2.5.0



256
257
258
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第256行

def 已关闭?
  @cursor.nil?
end

#每个{|Each| ... } ⇒枚举器

遍历变更流返回的文档。

对于可恢复错误,此方法会针对每个错误重试一次(连续两个错误会导致引发第二个错误,如果恢复错误,则会将错误计数重置为零)。

例子:

遍历文档流。

stream.each do |document|
  p document
end

收益参数:

  • 每个 ( BSON::Document )

    变更流文档。

返回:

  • (枚举器)

    枚举器。

由于:

  • 2.5.0



161
162
163
164
165
166
167
168
169
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第161行

def 
  提高 StopIteration.new if 已关闭?
  循环 do
    文档 = try_next
    产量 文档 if 文档
  end
救援 StopIteration
  return self
end

#检查string

获取用于检查的格式化string 。

例子:

检查变更流对象。

stream.inspect

返回:

  • ( string )

    变更流检查。

由于:

  • 2.5.0



268
269
270
271
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第268行

def 检查
  " #<Mongo::Collection::View:ChangeStream: 0 x #{ object_id } filters= #{ @change_stream_filters } " +
    " options= #{ @options } resume_token= #{ resume_token } > "
end

#resume_tokenBSON::Document | nil

返回流用于自动恢复的恢复令牌(如果存在)。

例子:

获取变更流恢复令牌。

stream.resume_token

返回:

  • ( BSON::Document | nil )

    变更流恢复令牌。

由于:

  • 2.10.0



282
283
284
285
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第282行

def resume_token
  cursor_resume_token = @cursor.resume_token if @cursor
  cursor_resume_token || @resume_token
end

# to_enum对象

由于:

  • 2.5.0



213
214
215
216
217
218
219
220
221
222
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第213行

def to_enum
  枚举 = 
  枚举.发送(:instance_variable_set, ' @obj ', self)
  class << 枚举
    def try_next
      @obj.try_next
    end
  end
  枚举
end

# try_nextBSON::Document | nil

从变更流中返回一份文档(如果有)。

出现可恢复错误时重试一次。

如果变更流已关闭,则引发 StopIteration。

此方法将等待来自服务器的更改长达 max_await_time_ms 毫秒,如果没有收到更改,将返回 nil。

返回:

  • ( BSON::Document | nil )

    变更流文档。

引发:

  • StopIteration

由于:

  • 2.6.0



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File ' 构建/ruby-driver-v2.19/lib/ mongo / 集合/view/change_stream.rb', 第183行

def try_next
  提高 StopIteration.new if 已关闭?
  开始
    doc = @cursor.try_next
  救援 mongo::错误 => e
    if !e.change_stream_resumable?
      提高
    end

    # 重新运行初始聚合。
    # 此处出现任何错误都将停止迭代并退出
    # 方法。

    # 保存游标的恢复令牌,以便我们使用
    # 创建一个新游标
    @resume_token = @cursor.resume_token

    关闭
    create_cursor!
    重试
  end

  # 我们需要验证每个文档都有一个_id,所以我们
  # 有一个可以使用的恢复令牌
  if doc && doc['_id'].nil?
    提高 错误::MissingResumeToken
  end
  doc
end