类:Mongo::Collection::View::ChangeStream

继承:
聚合(Aggregation) 显示全部
包括:
Aggregation::Behavior ,可重试
定义于:
lib/ Mongo/ 集合/view/change_stream.rb
lib/ Mongo/ 集合/view/change_stream/retryable.rb

Overview

注意:

仅适用于服务器版本3.6及更高版本。

注意:

由于此处记录的问题,ChangeStreams 无法与 JRuby 一起正常工作: github.com/jRuby/jRuby/issues/ 4212 。 也就是说,JRuby 会在背景绿色线程中急切地对枚举器求值 #next,因此在变更流上调用 #next 将导致在背景循环调用 getMores。

提供围绕聚合框架中“$changeStream”管道阶段的行为。 通过指定此阶段,用户可以请求就特定集合或数据库的所有更改发送通知。

由于:

  • 2.5.0

在命名空间下定义

模块: 可重试

常量摘要折叠

FULL_DOCUMENT_DEFAULT =

返回 fullDocument 选项的默认值。

返回:

  • ( string )

    fullDocument 选项默认值。

由于:

  • 2.5.0

' default '.冻结
DATABASE =

用于指示变更流应侦听整个数据库而不仅仅是集合的更改。

返回:

  • (符号)

    用于指示变更流应侦听整个数据库而不仅仅是集合的更改。

由于:

  • 2.6.0

:database
集群 =

用于指示变更流应侦听整个集群而不仅仅是集合上的变更。

返回:

  • (符号)

    用于指示变更流应侦听整个集群而不仅仅是集合上的变更。

由于:

  • 2.6.0

:cluster

Loggable中包含的常量

Loggable::PREFIX

可解释性中包含的常量

explainable::ALL_PLANS_EXECUTION 、explainable ::EXECUTION_STATS 、explainable ::QUERY_PLANNER

实例属性摘要折叠

Aggregation::Behavior 中包含的属性

#view

实例方法摘要折叠

Retryable 中包含的方法

#read_worker#select_server#write_worker

Aggregation::Behavior 中包含的方法

#allow_disk_use , #explain , #timeout_ms , # 写入?

Loggable中包含的方法

#log_debug#log_error#log_ Fatal#log_info#log_warn#logger

方法包含在可解释

#explain

Iterable包含的方法

#close_query

Mongo::CursorHost 中包含的方法

#validate_timeout_mode!

构造函数详情

#initialize (view, 管道,changes_for, options = {}) ⇒ ChangeStream

为提供的集合视图、管道和选项初始化变更流。

例子:

创建新的变更流视图。

ChangeStream.new(view, pipeline, options)

参数:

  • 查看 ( Collection::View )

    集合视图。

  • 管道 ( Array<Hash> )

    用于过滤变更通知的操作符管道。

  • 选项 哈希 (默认为: {}

    变更流选项。

选项哈希 ( options ):

  • :full_document string

    允许的值:nil、'default'、'updateLookup'、'whenAvailable'、'required'。

    默认为不发送值(即 nil),相当于“default”。 默认情况下,部分更新的变更通知将包含描述文档变更的增量。

    当设置为“updateLookup”时,部分更新的变更通知将包括描述文档更改的增量,以及自更改发生后某个时间以来已更改的整个文档的副本。

    当设置为 'whenAvailable' 时,将变更流配置为返回替换和更新变更事件的已修改文档的后像(如果此事件的后像可用)。

    当设置为“必需”时,行为与“whenAvailable”相同,只是如果后图像不可用,则会引发错误。

  • :full_document_before_change string

    允许的值:nil、'whenAvailable'、'required'、'off'。

    默认为不发送值(即 nil),相当于“off”。

    当设置为 'whenAvailable' 时,将变更流配置为返回用于替换、更新和删除变更事件的已修改文档的前像(如果可用)。

    当设置为“必需”时,行为与“whenAvailable”相同,只是如果前像不可用,则会引发错误。

  • :resume_after BSON::DocumentHash

    指定新变更流的逻辑起点。

  • :max_await_time_ms 整数

    服务器等待新文档满足变更流查询的最长时间。

  • :batch_size 整数

    批次中要返回的文档数量。

  • :collation BSON::DocumentHash

    要使用的排序规则。

  • :start_at_operation_time ( BSON::Timestamp )

    仅返回在指定时间戳时或之后发生的更改。 针对服务器运行的任何命令都将返回可在此处使用的集群时间。 仅由服务器版本4.0 + 识别。

  • :start_after Bson::DocumentHash

    与 :resume_after 类似,此选项接受恢复令牌并启动新的变更流,返回令牌后的第一个通知。 这将允许用户观看已删除并重新创建的集合或新重命名的集合,而不会错过任何通知。

  • :comment 对象

    用户提供的待附加到该命令的注释。

  • :show_expanded_events 布尔值

    使服务器能够发送变更流事件的“扩展”列表。 此标志集包含的其他事件列表包括:createIndexes、dropIndexes、modify、create、shardCollection、reshardCollection、refinedCollectionShardKey。

    如果同时指定了“startAfter”和“resumeAfter”,服务器将报告错误。

由于:

  • 2.5.0



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 133

def 初始化(查看, 管道, changes_for, 选项 = {})
  #变更流游标只能是 :iterable,因此我们不允许
  # 要指定的 timeout_mode。
  perper_setup(查看, 选项, forbid: %i[ timeout_mode ]) do
    @changes_for = changes_for
    @change_stream_filters = 管道 && 管道.dup
    @start_after = @options[:start_after]
  end

  # 变更流跟踪的恢复令牌,仅使用
  # 当没有游标或没有游标恢复令牌时
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # 恢复变更流时发送不同的参数
  # 与发送第一个查询时相比
  @resuming = true
end

实例属性详细信息

# 游标 游标(只读)

此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。

返回此操作的根本的游标。

返回:

  • (Cursor)

    此操作的根本的游标

由于:

  • 2.5.0



67
68
69
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 67

def cursor
  @cursor
end

# options =" BSON::Document" (只读)

返回变更流选项。

返回:

  • ( BSON::Document )

    变更流选项。

由于:

  • 2.5.0



63
64
65
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 63

def 选项
  @options
end

实例方法详细信息

# close (opts = {}) ⇒ nil

注意:

此方法尝试关闭变更流使用的游标,从而关闭服务器端变更流游标。 此方法忽略关闭服务器端游标时发生的任何错误。

关闭变更流。

例子:

关闭变更流。

stream.close

返回:

  • ( nil )

    始终为零。

由于:

  • 2.5.0



254
255
256
257
258
259
260
261
262
263
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 254

def 关闭(opts = {})
  除非 已关闭?
    开始
      @cursor.关闭(opts)
    救援 错误::OperationFailure::家庭情况, 错误::SocketError, 错误::SocketTimeoutError, 错误::MissingConnection
      # 忽略
    end
    @cursor = nil
  end
end

#已关闭?true , false

变更流是否已关闭?

例子:

确定变更流是否已关闭。

stream.closed?

返回:

  • ( true , false )

    如果变更流已关闭。

由于:

  • 2.5.0



273
274
275
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 273

def 已关闭?
  @cursor.nil?
end

# cursor_type 对象

“变更流是对 tailable-awaitData 游标的抽象……”

返回:

  • :tailable_await

由于:

  • 2.5.0



307
308
309
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 307

def cursor_type
  :tailable_await
end

#每个{|Each| ... } ⇒枚举器

遍历变更流返回的文档。

对于可恢复错误,此方法会针对每个错误重试一次(连续两个错误会导致引发第二个错误,如果恢复错误,则会将错误计数重置为零)。

例子:

遍历文档流。

stream.each do |document|
  p document
end

收益参数:

  • 每个 ( BSON::Document )

    变更流文档。

返回:

  • (枚举器)

    枚举器。

由于:

  • 2.5.0



169
170
171
172
173
174
175
176
177
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 169

def 
  提高 StopIteration.new if 已关闭?
  循环 do
    文档 = try_next
    产量 文档 if 文档
  end
救援 StopIteration
  return self
end

#检查string

获取用于检查的格式化string 。

例子:

检查变更流对象。

stream.inspect

返回:

  • ( string )

    变更流检查。

由于:

  • 2.5.0



285
286
287
288
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 285

def 检查
  " #<Mongo::Collection::View:ChangeStream: 0 x #{ object_id } filters= #{ @change_stream_filters } " +
    " options= #{ @options } resume_token= #{ resume_token } > "
end

# max_await_time_ms 整数 | nil

返回传递给此变更流的max_await_time_ms 选项的值。

返回:

  • ( Integer | nil )

    max_await_time_ms 值

由于:

  • 2.5.0



322
323
324
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 322

def max_await_time_ms
  选项[:max_await_time_ms]
end

#resume_tokenBSON::Document | nil

返回流用于自动恢复的恢复令牌(如果存在)。

例子:

获取变更流恢复令牌。

stream.resume_token

返回:

  • ( BSON::Document | nil )

    变更流恢复令牌。

由于:

  • 2.10.0



299
300
301
302
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 299

def resume_token
  cursor_resume_token = @cursor.resume_token if @cursor
  cursor_resume_token || @resume_token
end

# timeout_mode 对象

“变更流……隐式使用迭代模式”

返回:

  • :iteration

由于:

  • 2.5.0



314
315
316
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 314

def timeout_mode
  :iteration
end

# to_enum对象

由于:

  • 2.5.0



227
228
229
230
231
232
233
234
235
236
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 227

def to_enum
  枚举 = 
  枚举.发送(:instance_variable_set, ' @obj ', self)
  class << 枚举
    def try_next
      @obj.try_next
    end
  end
  枚举
end

# try_nextBSON::Document | nil

从变更流中返回一份文档(如果有)。

出现可恢复错误时重试一次。

如果变更流已关闭,则引发 StopIteration。

此方法将等待来自服务器的更改长达 max_await_time_ms 毫秒,如果没有收到更改,将返回 nil。

返回:

  • ( BSON::Document | nil )

    变更流文档。

引发:

  • StopIteration

由于:

  • 2.6.0



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 191

def try_next
  recreate_cursor! if @timed_out

  提高 StopIteration.new if 已关闭?

  开始
    doc = @cursor.try_next
  救援 mongo::错误 => e
    # "如果下一次调用因超时错误而失败,驱动程序不得
    # 使变更流失效。 随后的 next 调用必须
    # 执行恢复尝试以建立新的变更流
    # 服务器..."
    #
    # 然而,SocketTimeoutErrors 是 TimeoutErrors,但也是
    # change-stream-resumable. 要保留现有(指定)行为,
    # 我们只在错误不存在时才计算超时
    # change-stream-resumable.
    @timed_out = e.is_a?(mongo::错误::超时错误) && !e.change_stream_resumable?

    提高 除非 @timed_out || e.change_stream_resumable?

    @resume_token = @cursor.resume_token
    提高 e if @timed_out

    recreate_cursor!(@cursor.上下文)
    重试
  end

  # 我们需要验证每个文档都有一个_id,所以我们
  # 有一个可以使用的恢复令牌
  if doc && doc['_id'].nil?
    提高 错误::MissingResumeToken
  end
  doc
end