클래스: Mongo::Collection::View::ChangeStream
- 다음을 포함합니다.
- Aggregation::Behavior, 재시도 가능
- 다음에 정의됨:
- lib/ Mongo/ 컬렉션/view/change_stream.rb,
lib/ Mongo/ 컬렉션/view/change_stream/retryable.rb more...
개요
서버 버전 3.6 이상에서만 사용할 수 있습니다.
ChangeStreams는 여기에 설명된 문제로 인해 JRuby에서 제대로 작동하지 않습니다: github.com/jruby/jruby/issues/4212. 즉, JRuby는 배경 녹색 스레드의 열거자에서 #next를 열심히 평가하므로 변경 스트림 에서 #next를 호출하면 배경 의 루프에서 getMores가 호출됩니다.
집계 프레임워크 의 '$changeStream' 파이프라인 단계와 관련된 동작을 제공합니다. 이 단계를 지정하면 사용자가 특정 컬렉션 또는 데이터베이스 의 모든 변경 사항에 대해 알림 을 보내도록 요청 수 있습니다.
네임스페이스 아래에 정의됨
모듈: 재시도 가능
상수 요약 접기
- FULL_DOCUMENT_DEFAULT =
fullDocument 옵션 기본값 을 반환합니다.
'default'.동결
- DATABASE =
반환값 변경 스트림 이 컬렉션 뿐만 아니라 전체 데이터베이스 에서 변경 사항을 수신해야 함을 나타내는 데 사용됩니다.
:database
- CLUSTER =
반환값 변경 스트림 이 컬렉션 뿐만 아니라 전체 클러스터 에서 변경 사항을 수신 대기해야 함을 나타내는 데 사용됩니다.
:cluster
Loggable에서 포함된 상수
Explainable에 포함된 상수
Explainable::ALL_PLANS_EXECUTION, Explainable ::EXECUTION_STATS, Explainable::QUERY_PLANNER
인스턴스 속성 요약 접기
-
#cursor ⇒ Cursor
읽기 전용
비공개
이 작업의 기본 커서 입니다.
-
#options ⇒ BSON::Document
읽기 전용
변경 스트림 옵션.
Aggregation::Behavior에포함된 속성
인스턴스 메서드 요약 접기
-
#close(opts = {}) ⇒ nil
변경 스트림을 닫습니다.
-
#닫았나요? ⇒ 참, 거짓
변경 스트림이 닫혔나요?
-
#cursor_type ⇒ 객체
"변경 스트림은 tailable-awaitData 커서를 추상화한 것입니다...".
-
#각 {|Each| ... } ⇒ 열거자
변경 스트림에서 반환된 문서를 반복합니다.
-
#initialize(view, 파이프라인, changes_for, options = {}) ⇒ ChangeStream
생성자
제공된 collection 보기, 파이프라인 및 옵션에 대한 변경 스트림을 초기화합니다.
-
#검사 ⇒ string
검사에 사용할 형식이 지정된 string 을 가져옵니다.
-
#max_await_time_ms ⇒ 정수 | nil
이 변경 스트림 에 전달된 max_await_time_ms 옵션의 값을 반환합니다.
-
#resume_token ⇒ BSON::Document | nil
스트림 이 자동으로 재개하는 데 사용할 재개 토큰이 있는 경우 이를 반환합니다.
-
#timeout_mode ⇒ 객체
"변경 스트림...암시적으로 반복 모드 사용 ".
- #to_enum ⇒ 객체
-
#try_next ⇒ BSON::Document | nil
변경 스트림 에서 문서 하나를 반환합니다(사용 가능한 경우).
Retryable에 포함된 메서드
#read_Worker, #select_server, #write_Worker
Aggregation::Behavior에포함된 메서드
#allow_disk_use, #explain, #timeout_ms, # 쓰기 (write)?
Loggable에 포함된 메서드
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Explainable에 포함된 메서드
Iterable에 포함된 메서드
Mongo::CursorHost에 포함된 메서드
생성자 세부 정보
#initialize(view, 파이프라인, changes_for, options = {}) ⇒ ChangeStream
제공된 collection 보기, 파이프라인 및 옵션에 대한 변경 스트림을 초기화합니다.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 133 def 초기화(보기, 파이프라인, change_for, = {}) # 변경 스트림 커서는 :iterable만 가능하므로 허용하지 않습니다. # timeout_mode를 지정합니다. Perform_setup(보기, , forbid: %i[ timeout_mode ]) do @changes_for = change_for @change_stream_filters = 파이프라인 && 파이프라인.dup @start_after = @options[:start_after] end # 변경 스트림에서 추적하는 재개 토큰으로, 만 사용됩니다. # 커서가 없거나 커서가 없는 경우 토큰 재개 @resume_token = @start_after || @options[:resume_after] create_cursor! # 변경 스트림을 재개할 때 다른 매개변수를 전송합니다. # 첫 번째 쿼리를 보낼 때와 비교 @resuming = true end |
인스턴스 속성 세부 정보
#커서 ⇒ 커서(읽기 전용)
이 메서드는 비공개 API의 일부입니다. 이 방법은 향후 제거되거나 변경될 수 있으므로 가능하면 사용하지 않는 것이 좋습니다.
이 작업의 기본 커서 반환합니다.
67 68 69 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 67 def cursor @cursor end |
#options ⇒ BSON::Document (읽기 전용)
변경 스트림 옵션을 반환합니다.
63 64 65 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 63 def @options end |
인스턴스 메서드 세부 정보
#close(opts = {}) ⇒ nil
이 메서드는 변경 스트림에서 사용하는 커서를 닫으려고 시도하며, 결과적으로 서버 측 변경 스트림 커서가 닫힙니다. 이 메서드는 서버 측 커서를 닫을 때 발생하는 모든 오류를 무시합니다.
변경 스트림을 닫습니다.
254 255 256 257 258 259 260 261 262 263 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 254 def 닫기(opts = {}) 하지 않는 한 닫힘? 시작 @cursor.닫기(opts) 구출 오류::OperationFailure::패밀리, 오류::SocketError, 오류::SocketTimeoutError, 오류::MissingConnection # 무시 end @cursor = nil end end |
#닫았나요? ⇒ true, false
변경 스트림이 닫혔나요?
273 274 275 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 273 def 닫힘? @cursor.nil? end |
#cursor_type ⇒ 객체
"변경 스트림은 tailable-awaitData 커서를 추상화한 것입니다..."
307 308 309 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 307 def cursor_type :tailable_await end |
#각 {|Each| ... } ⇒ 열거자
변경 스트림에서 반환된 문서를 반복합니다.
이 메서드는 재개 가능한 오류에 대해 오류당 한 번씩 재시도합니다(오류가 두 번 연속으로 발생하면 두 번째 오류가 발생하고, 에서 복구된 오류는 오류 횟수를 0으로 재설정).
169 170 171 172 173 174 175 176 177 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 169 def 각 올리다 StopIteration.신규 만약 닫힘? 루프 do 문서 = try_next yield 문서 만약 문서 end 구출 StopIteration 반환 self end |
#검사 ⇒ string
검사에 사용할 형식이 지정된 string 을 가져옵니다.
285 286 287 288 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 285 def 검사 "#<Mongo::Collection::View:ChangeStream:0x#{object_id} 필터=#{@change_stream_filters} " + "options=#{@options} resume_token=#{resume_token}>" end |
#max_await_time_ms ⇒ 정수 | nil
이 변경 스트림 에 전달된 max_await_time_ms 옵션의 값을 반환합니다.
322 323 324 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 322 def max_await_time_ms [:max_await_time_ms] end |
#resume_token ⇒ BSON::Document | nil
스트림 이 자동으로 재개하는 데 사용할 재개 토큰이 있는 경우 이를 반환합니다.
299 300 301 302 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 299 def resume_token cursor_resume_token = @cursor.resume_token 만약 @cursor cursor_resume_token || @resume_token end |
#timeout_mode ⇒ 객체
"변경 스트림...암시적으로 반복 모드 사용 "
314 315 316 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 314 def timeout_mode :iteration end |
#to_enum ⇒ 객체
227 228 229 230 231 232 233 234 235 236 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 227 def to_enum 열거형 = super 열거형.send(:instance_variable_set, '@obj', self) 클래스 << 열거형 def try_next @obj.try_next end end 열거형 end |
#try_next ⇒ BSON::Document | nil
변경 스트림 에서 문서 하나를 반환합니다(사용 가능한 경우).
재개 가능한 오류가 발생하면 한 번 재시도합니다.
변경 스트림이 닫히면 StopIteration을 발생시킵니다.
이 메서드는 서버 의 변경 사항을 최대 max_await_time_ms 밀리초 동안 기다리며, 변경 사항이 수신되지 않으면 nil을 반환합니다.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 191 def try_next recreate_cursor! 만약 @timed_out 올리다 StopIteration.신규 만약 닫힘? 시작 doc = @cursor.try_next 구출 mongo::오류 => e # "시간 초과 오류로 인해 다음 호출이 실패하는 경우 드라이버는 # 변경 스트림 무효화합니다. 후속 다음 호출은 반드시 # 새 변경 스트림 설정하기 위해 재개 시도를 수행합니다. # 서버..." # # 그러나 SocketTimeoutErrors는 TimeoutErrors이지만, # change-stream-resumable. 기존(지정된) 동작을 보존하려면 다음을 수행합니다. # 오류가 다음과 같지 않은 경우에만 시간 초과를 계산합니다. # change-stream-resumable. @timed_out = e.is_a?(mongo::오류::시간 초과 오류) && !e.change_stream_resumable? 올리다 하지 않는 한 @timed_out || e.change_stream_resumable? @resume_token = @cursor.resume_token 올리다 e 만약 @timed_out recreate_cursor!(@cursor.컨텍스트) 재시도 end # 각 문서에 _id 가 있는지 확인해야 하므로 # 작업할 재개 토큰이 있습니다. 만약 doc && doc['_id'].nil? 올리다 오류::MissingResumeToken end doc end |