类:Mongo::Collection::View::ChangeStream
- 继承:
-
聚合(Aggregation)
- 对象
- 聚合(Aggregation)
- Mongo::Collection::View::ChangeStream
- 包括:
- Aggregation::Behavior ,可重试
- 定义于:
- lib/ Mongo/ 集合/view/change_stream.rb 、
lib/ Mongo/ 集合/view/change_stream/retryable.rb
Overview
仅适用于服务器版本3.6及更高版本。
由于此处记录的问题,ChangeStreams 无法与 JRuby 一起正常工作: github.com/jRuby/jRuby/issues/ 4212 。 也就是说,JRuby 会在背景绿色线程中急切地对枚举器求值 #next,因此在变更流上调用 #next 将导致在背景循环调用 getMores。
提供围绕聚合框架中“$changeStream”管道阶段的行为。 通过指定此阶段,用户可以请求就特定集合或数据库的所有更改发送通知。
在命名空间下定义
模块: 可重试
常量摘要折叠
- FULL_DOCUMENT_DEFAULT =
返回 fullDocument 选项的默认值。
' default '.冻结
- DATABASE =
用于指示变更流应侦听整个数据库而不仅仅是集合的更改。
:database
- 集群 =
用于指示变更流应侦听整个集群而不仅仅是集合上的变更。
:cluster
Loggable中包含的常量
可解释性中包含的常量
explainable::ALL_PLANS_EXECUTION 、explainable ::EXECUTION_STATS 、explainable ::QUERY_PLANNER
实例属性摘要折叠
-
#cursor ⇒ Cursor
只读
private
此操作的根本的游标。
-
#options ⇒ BSON::Document
只读
变更流选项。
Aggregation::Behavior 中包含的属性
实例方法摘要折叠
-
# close (opts = {}) ⇒ nil
关闭变更流。
-
#已关闭? ⇒ true, false
变更流是否已关闭?
-
# cursor_type ⇒ 对象
“变更流是围绕 tailable-awaitData 游标的抽象……”。
-
#每个{|Each| ... } ⇒ 枚举器
遍历变更流返回的文档。
-
#initialize (view, 管道,changes_for, options = {}) ⇒ ChangeStream
构造函数
为提供的集合视图、管道和选项初始化变更流。
-
#检查⇒ string
获取用于检查的格式化string 。
-
# max_await_time_ms ⇒ 整数 | nil
返回传递给此变更流的max_await_time_ms 选项的值。
-
#resume_token ⇒ BSON::Document | nil
返回流用于自动恢复的恢复令牌(如果存在)。
-
# timeout_mode ⇒ 对象
“变更流……隐式使用迭代模式”。
- # to_enum ⇒ 对象
-
# try_next ⇒ BSON::Document | nil
从变更流中返回一份文档(如果有)。
Retryable 中包含的方法
#read_worker 、 #select_server 、 #write_worker
Aggregation::Behavior 中包含的方法
#allow_disk_use , #explain , #timeout_ms , # 写入?
Loggable中包含的方法
#log_debug 、 #log_error 、 #log_ Fatal 、 #log_info 、 #log_warn 、 #logger
方法包含在可解释
从Iterable包含的方法
Mongo::CursorHost 中包含的方法
构造函数详情
#initialize (view, 管道,changes_for, options = {}) ⇒ ChangeStream
为提供的集合视图、管道和选项初始化变更流。
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 133 def 初始化(查看, 管道, changes_for, = {}) #变更流游标只能是 :iterable,因此我们不允许 # 要指定的 timeout_mode。 perper_setup(查看, , forbid: %i[ timeout_mode ]) do @changes_for = changes_for @change_stream_filters = 管道 && 管道.dup @start_after = @options[:start_after] end # 变更流跟踪的恢复令牌,仅使用 # 当没有游标或没有游标恢复令牌时 @resume_token = @start_after || @options[:resume_after] create_cursor! # 恢复变更流时发送不同的参数 # 与发送第一个查询时相比 @resuming = true end |
实例属性详细信息
# 游标 ⇒ 游标(只读)
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
返回此操作的根本的游标。
67 68 69 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 67 def cursor @cursor end |
# options =" BSON::Document" (只读)
返回变更流选项。
63 64 65 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 63 def @options end |
实例方法详细信息
# close (opts = {}) ⇒ nil
此方法尝试关闭变更流使用的游标,从而关闭服务器端变更流游标。 此方法忽略关闭服务器端游标时发生的任何错误。
关闭变更流。
254 255 256 257 258 259 260 261 262 263 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 254 def 关闭(opts = {}) 除非 已关闭? 开始 @cursor.关闭(opts) 救援 错误::OperationFailure::家庭情况, 错误::SocketError, 错误::SocketTimeoutError, 错误::MissingConnection # 忽略 end @cursor = nil end end |
#已关闭? ⇒ true , false
变更流是否已关闭?
273 274 275 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 273 def 已关闭? @cursor.nil? end |
# cursor_type ⇒ 对象
“变更流是对 tailable-awaitData 游标的抽象……”
307 308 309 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 307 def cursor_type :tailable_await end |
#每个{|Each| ... } ⇒枚举器
遍历变更流返回的文档。
对于可恢复错误,此方法会针对每个错误重试一次(连续两个错误会导致引发第二个错误,如果恢复错误,则会将错误计数重置为零)。
169 170 171 172 173 174 175 176 177 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 169 def 每 提高 StopIteration.new if 已关闭? 循环 do 文档 = try_next 产量 文档 if 文档 end 救援 StopIteration return self end |
#检查⇒ string
获取用于检查的格式化string 。
285 286 287 288 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 285 def 检查 " #<Mongo::Collection::View:ChangeStream: 0 x #{ object_id } filters= #{ @change_stream_filters } " + " options= #{ @options } resume_token= #{ resume_token } > " end |
# max_await_time_ms ⇒ 整数 | nil
返回传递给此变更流的max_await_time_ms 选项的值。
322 323 324 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 322 def max_await_time_ms [:max_await_time_ms] end |
#resume_token ⇒ BSON::Document | nil
返回流用于自动恢复的恢复令牌(如果存在)。
299 300 301 302 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 299 def resume_token cursor_resume_token = @cursor.resume_token if @cursor cursor_resume_token || @resume_token end |
# timeout_mode ⇒ 对象
“变更流……隐式使用迭代模式”
314 315 316 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 314 def timeout_mode :iteration end |
# to_enum ⇒对象
227 228 229 230 231 232 233 234 235 236 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 227 def to_enum 枚举 = 超 枚举.发送(:instance_variable_set, ' @obj ', self) class << 枚举 def try_next @obj.try_next end end 枚举 end |
# try_next ⇒ BSON::Document | nil
从变更流中返回一份文档(如果有)。
出现可恢复错误时重试一次。
如果变更流已关闭,则引发 StopIteration。
此方法将等待来自服务器的更改长达 max_await_time_ms 毫秒,如果没有收到更改,将返回 nil。
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/ Mongo/ 集合/view/change_stream.rb', line 191 def try_next recreate_cursor! if @timed_out 提高 StopIteration.new if 已关闭? 开始 doc = @cursor.try_next 救援 mongo::错误 => e # "如果下一次调用因超时错误而失败,驱动程序不得 # 使变更流失效。 随后的 next 调用必须 # 执行恢复尝试以建立新的变更流 # 服务器..." # # 然而,SocketTimeoutErrors 是 TimeoutErrors,但也是 # change-stream-resumable. 要保留现有(指定)行为, # 我们只在错误不存在时才计算超时 # change-stream-resumable. @timed_out = e.is_a?(mongo::错误::超时错误) && !e.change_stream_resumable? 提高 除非 @timed_out || e.change_stream_resumable? @resume_token = @cursor.resume_token 提高 e if @timed_out recreate_cursor!(@cursor.上下文) 重试 end # 我们需要验证每个文档都有一个_id,所以我们 # 有一个可以使用的恢复令牌 if doc && doc['_id'].nil? 提高 错误::MissingResumeToken end doc end |