클래스: Mongo::Collection::View::ChangeStream

상속:
집계
  • 객체
모두 표시
다음을 포함합니다.
Aggregation::Behavior, 재시도 가능
다음에 정의됨:
lib/ Mongo/ 컬렉션/view/change_stream.rb,
lib/ Mongo/ 컬렉션/view/change_stream/retryable.rb
more...

개요

참고:

서버 버전 3.6 이상에서만 사용할 수 있습니다.

참고:

ChangeStreams는 여기에 설명된 문제로 인해 JRuby에서 제대로 작동하지 않습니다: github.com/jruby/jruby/issues/4212. 즉, JRuby는 배경 녹색 스레드의 열거자에서 #next를 열심히 평가하므로 변경 스트림 에서 #next를 호출하면 배경 의 루프에서 getMores가 호출됩니다.

집계 프레임워크 의 '$changeStream' 파이프라인 단계와 관련된 동작을 제공합니다. 이 단계를 지정하면 사용자가 특정 컬렉션 또는 데이터베이스 의 모든 변경 사항에 대해 알림 을 보내도록 요청 수 있습니다.

이후:

  • 2.5.0

네임스페이스 아래에 정의됨

모듈: 재시도 가능

상수 요약 접기

FULL_DOCUMENT_DEFAULT =

fullDocument 옵션 기본값 을 반환합니다.

반환합니다:

  • (string)

    fullDocument 옵션 기본값입니다.

이후:

  • 2.5.0

'default'.동결
DATABASE =

반환값 변경 스트림 이 컬렉션 뿐만 아니라 전체 데이터베이스 에서 변경 사항을 수신해야 함을 나타내는 데 사용됩니다.

반환합니다:

  • (기호)

    변경 스트림 이 컬렉션 뿐만 아니라 전체 데이터베이스 에서 변경 사항을 수신 대기해야 함을 나타내는 데 사용됩니다.

이후:

  • 2.6.0

:database
CLUSTER =

반환값 변경 스트림 이 컬렉션 뿐만 아니라 전체 클러스터 에서 변경 사항을 수신 대기해야 함을 나타내는 데 사용됩니다.

반환합니다:

  • (기호)

    변경 스트림이 컬렉션뿐만 아니라 전체 클러스터에서 변경 사항을 수신 대기해야 함을 나타내는 데 사용됩니다.

이후:

  • 2.6.0

:cluster

Loggable에서 포함된 상수

Loggable::prefix

Explainable에 포함된 상수

Explainable::ALL_PLANS_EXECUTION, Explainable ::EXECUTION_STATS, Explainable::QUERY_PLANNER

인스턴스 속성 요약 접기

Aggregation::Behavior에포함된 속성

#view

인스턴스 메서드 요약 접기

Retryable에 포함된 메서드

#read_Worker, #select_server, #write_Worker

Aggregation::Behavior에포함된 메서드

#allow_disk_use, #explain, #timeout_ms, # 쓰기 (write)?

Loggable에 포함된 메서드

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Explainable에 포함된 메서드

#explain

Iterable에 포함된 메서드

#close_query

Mongo::CursorHost에 포함된 메서드

#validate_timeout_mode!

생성자 세부 정보

#initialize(view, 파이프라인, changes_for, options = {}) ⇒ ChangeStream

제공된 collection 보기, 파이프라인 및 옵션에 대한 변경 스트림을 초기화합니다.

예시:

새 변경 스트림 보기를 만듭니다.

ChangeStream.new(view, pipeline, options)

매개변수:

  • 보기 (Collection::View)

    컬렉션 보기입니다.

  • 파이프라인 (Array<Hash>)

    변경 알림 을 필터하다 하는 연산자 파이프라인 입니다.

  • 옵션 (해시) (기본값: {})

    변경 스트림 옵션.

옵션 해시(options):

  • :full_document (string)

    허용되는 값: nil, 'default', 'updateLookup', 'whenAvailable', 'required'.

    기본값은 값을 전송하지 않는 것입니다(예: nil)이며, 'default'와 동일합니다. 기본적으로 부분 업데이트에 대한 변경 알림에는 문서의 변경 사항을 설명하는 델타가 포함됩니다.

    'updateLookup'으로 설정하면 부분 업데이트에 대한 변경 알림에 문서의 변경 사항을 설명하는 델타와 변경 발생 후 일정 기간 동안 변경된 전체 문서의 사본이 모두 포함됩니다.

    'whenAvailable'로 설정하면 이 이벤트에 대한 사후 이미지를 사용할 수 있는 경우 교체 및 업데이트 변경 이벤트에 대해 수정된 문서의 사후 이미지를 반환하도록 변경 스트림을 구성합니다.

    '필수'로 설정하면 사후 이미지를 사용할 수 없는 경우 오류가 발생한다는 점을 제외하면 'whenAvailable'과 동작이 동일합니다.

  • :full_document_before_change (string)

    허용되는 값: nil, 'whenAvailable', 'required', 'off'.

    기본값은 값을 전송하지 않는 것입니다(예: nil)은 'off'와 동일합니다.

    'whenAvailable'로 설정하면 사용 가능한 경우 바꾸기, 업데이트 및 삭제 변경 이벤트에 대해 수정된 문서의 사전 이미지를 반환하도록 변경 스트림을 구성합니다.

    '필수'로 설정하면, 사전 이미지를 사용할 수 없는 경우 오류가 발생한다는 점을 제외하면 'whenAvailable'과 동작이 동일합니다.

  • :resume_after (BSON::Document, Hash)

    새 변경 스트림의 논리적 시작점을 지정합니다.

  • :max_await_time_ms (정수)

    서버가 변경 스트림 쿼리를 충족하기 위해 새 문서를 기다리는 최대 시간입니다.

  • :batch_size (정수)

    배치당 반환할 문서 수입니다.

  • :collation (BSON::Document, Hash)

    사용할 데이터 정렬입니다.

  • :start_at_operation_time (BSON::Timestamp)

    지정된 타임스탬프 또는 그 이후에 발생한 변경 사항만 반환합니다. 서버에 대해 명령을 실행하면 여기에서 사용할 수 있는 클러스터 시간이 반환됩니다. 서버 버전 4.0이상에서만 인식됩니다.

  • :start_after (Bson::Document, Hash)

    :resume_after와 유사하게, 이 옵션은 재개 토큰을 사용하고 토큰 이후 첫 번째 알림을 반환하는 새 변경 스트림 을 시작합니다. 이를 통해 사용자는 삭제되었다가 다시 생성된 컬렉션 또는 새로 이름이 변경된 컬렉션을 알림 을 놓치지 않고 볼 수 있습니다.

  • :comment (객체)

    이 명령에 첨부할 사용자 제공 코멘트입니다.

  • :show_expanded_events (부울)

    서버가 변경 스트림 이벤트의 '확장' 목록을 보낼 수 있도록 합니다. 이 플래그 세트에 포함된 추가 이벤트 목록은 createIndexes, dropIndexes, Modify, create, shardCollection, reshardCollection, refineCollectionShardKey입니다.

    'startAfter'와 'resumeAfter'가 모두 지정된 경우 서버는 오류를 보고합니다.

이후:

  • 2.5.0

[소스 보기]

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 133

def 초기화(보기, 파이프라인, change_for, 옵션 = {})
  # 변경 스트림 커서는 :iterable만 가능하므로 허용하지 않습니다.
  # timeout_mode를 지정합니다.
  Perform_setup(보기, 옵션, forbid: %i[ timeout_mode ]) do
    @changes_for = change_for
    @change_stream_filters = 파이프라인 && 파이프라인.dup
    @start_after = @options[:start_after]
  end

  # 변경 스트림에서 추적하는 재개 토큰으로, 만 사용됩니다.
  # 커서가 없거나 커서가 없는 경우 토큰 재개
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # 변경 스트림을 재개할 때 다른 매개변수를 전송합니다.
  # 첫 번째 쿼리를 보낼 때와 비교
  @resuming = true
end

인스턴스 속성 세부 정보

#커서 커서(읽기 전용)

이 메서드는 비공개 API의 일부입니다. 이 방법은 향후 제거되거나 변경될 수 있으므로 가능하면 사용하지 않는 것이 좋습니다.

이 작업의 기본 커서 반환합니다.

반환합니다:

  • (Cursor)

    이 작업의 기본 커서

이후:

  • 2.5.0


67
68
69
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 67

def cursor
  @cursor
end

#optionsBSON::Document (읽기 전용)

변경 스트림 옵션을 반환합니다.

반환합니다:

  • (BSON::Document)

    변경 스트림 옵션.

이후:

  • 2.5.0


63
64
65
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 63

def 옵션
  @options
end

인스턴스 메서드 세부 정보

#close(opts = {}) ⇒ nil

참고:

이 메서드는 변경 스트림에서 사용하는 커서를 닫으려고 시도하며, 결과적으로 서버 측 변경 스트림 커서가 닫힙니다. 이 메서드는 서버 측 커서를 닫을 때 발생하는 모든 오류를 무시합니다.

변경 스트림을 닫습니다.

예시:

변경 스트림을 닫습니다.

stream.close

반환합니다:

  • (nil)

    항상 nil입니다.

이후:

  • 2.5.0

[소스 보기]

254
255
256
257
258
259
260
261
262
263
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 254

def 닫기(opts = {})
  하지 않는 한 닫힘?
    시작
      @cursor.닫기(opts)
    구출 오류::OperationFailure::패밀리, 오류::SocketError, 오류::SocketTimeoutError, 오류::MissingConnection
      # 무시
    end
    @cursor = nil
  end
end

#닫았나요?true, false

변경 스트림이 닫혔나요?

예시:

변경 스트림이 종료되었는지 확인합니다.

stream.closed?

반환합니다:

  • (true, false)

    변경 스트림이 닫힌 경우.

이후:

  • 2.5.0

[소스 보기]

273
274
275
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 273

def 닫힘?
  @cursor.nil?
end

#cursor_type객체

"변경 스트림은 tailable-awaitData 커서를 추상화한 것입니다..."

반환합니다:

  • :tailable_await

이후:

  • 2.5.0

[소스 보기]

307
308
309
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 307

def cursor_type
  :tailable_await
end

# {|Each| ... } ⇒ 열거자

변경 스트림에서 반환된 문서를 반복합니다.

이 메서드는 재개 가능한 오류에 대해 오류당 한 번씩 재시도합니다(오류가 두 번 연속으로 발생하면 두 번째 오류가 발생하고, 에서 복구된 오류는 오류 횟수를 0으로 재설정).

예시:

문서 스트림을 반복합니다.

stream.each do |document|
  p document
end

수율 매개변수:

  • (BSON::Document)

    변경 스트림 문서.

반환합니다:

  • (열거자)

    열거자입니다.

이후:

  • 2.5.0

[소스 보기]

169
170
171
172
173
174
175
176
177
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 169

def 
  올리다 StopIteration.신규 만약 닫힘?
  루프 do
    문서 = try_next
    yield 문서 만약 문서
  end
구출 StopIteration
  반환 self
end

#검사string

검사에 사용할 형식이 지정된 string 을 가져옵니다.

예시:

변경 스트림 객체 를 검사합니다.

stream.inspect

반환합니다:

  • (string)

    변경 스트림 검사.

이후:

  • 2.5.0

[소스 보기]

285
286
287
288
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 285

def 검사
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} 필터=#{@change_stream_filters} " +
    "options=#{@options} resume_token=#{resume_token}>"
end

#max_await_time_ms정수 | nil

이 변경 스트림 에 전달된 max_await_time_ms 옵션의 값을 반환합니다.

반환합니다:

  • (정수 | nil)

    max_await_time_ms 값

이후:

  • 2.5.0

[소스 보기]

322
323
324
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 322

def max_await_time_ms
  옵션[:max_await_time_ms]
end

#resume_tokenBSON::Document | nil

스트림 이 자동으로 재개하는 데 사용할 재개 토큰이 있는 경우 이를 반환합니다.

예시:

변경 스트림 재개 토큰을 가져옵니다.

stream.resume_token

반환합니다:

  • (BSON::Document | nil)

    변경 스트림 재개 토큰입니다.

이후:

  • 2.10.0

[소스 보기]

299
300
301
302
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 299

def resume_token
  cursor_resume_token = @cursor.resume_token 만약 @cursor
  cursor_resume_token || @resume_token
end

#timeout_mode객체

"변경 스트림...암시적으로 반복 모드 사용 "

반환합니다:

  • :iteration

이후:

  • 2.5.0

[소스 보기]

314
315
316
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 314

def timeout_mode
  :iteration
end

#to_enum객체

이후:

  • 2.5.0

[소스 보기]

227
228
229
230
231
232
233
234
235
236
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 227

def to_enum
  열거형 = super
  열거형.send(:instance_variable_set, '@obj', self)
  클래스 << 열거형
    def try_next
      @obj.try_next
    end
  end
  열거형
end

#try_nextBSON::Document | nil

변경 스트림 에서 문서 하나를 반환합니다(사용 가능한 경우).

재개 가능한 오류가 발생하면 한 번 재시도합니다.

변경 스트림이 닫히면 StopIteration을 발생시킵니다.

이 메서드는 서버 의 변경 사항을 최대 max_await_time_ms 밀리초 동안 기다리며, 변경 사항이 수신되지 않으면 nil을 반환합니다.

반환합니다:

  • (BSON::Document | nil)

    변경 스트림 문서 입니다.

다음을 발생시킵니다.

  • (StopIteration)

이후:

  • 2.6.0

[소스 보기]

191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# 파일 'lib/ Mongo/ 컬렉션/view/change_stream.rb', 줄 191

def try_next
  recreate_cursor! 만약 @timed_out

  올리다 StopIteration.신규 만약 닫힘?

  시작
    doc = @cursor.try_next
  구출 mongo::오류 => e
    # "시간 초과 오류로 인해 다음 호출이 실패하는 경우 드라이버는
    # 변경 스트림 무효화합니다. 후속 다음 호출은 반드시
    # 새 변경 스트림 설정하기 위해 재개 시도를 수행합니다.
    # 서버..."
    #
    # 그러나 SocketTimeoutErrors는 TimeoutErrors이지만,
    # change-stream-resumable. 기존(지정된) 동작을 보존하려면 다음을 수행합니다.
    # 오류가 다음과 같지 않은 경우에만 시간 초과를 계산합니다.
    # change-stream-resumable.
    @timed_out = e.is_a?(mongo::오류::시간 초과 오류) && !e.change_stream_resumable?

    올리다 하지 않는 한 @timed_out || e.change_stream_resumable?

    @resume_token = @cursor.resume_token
    올리다 e 만약 @timed_out

    recreate_cursor!(@cursor.컨텍스트)
    재시도
  end

  # 각 문서에 _id 가 있는지 확인해야 하므로
  # 작업할 재개 토큰이 있습니다.
  만약 doc && doc['_id'].nil?
    올리다 오류::MissingResumeToken
  end
  doc
end