Classe: Mongo::Collection::View::ChangeStream
- Herda:
-
Agregação
- Objeto
- Agregação
- Mongo::collection::View::ChangeStream
- Inclui:
- Aggregation::Behavior, Retryable
- Definido em:
- lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb mais...
Visão geral
Disponível apenas nas versões de servidor 3.6 e superiores.
Os ChangeStreams não funcionam corretamente com o JRuby devido ao problema documentado aqui: github.com/jruby/jruby/issues/{14212. Ou seja, JRuby avalia cuidadosamente #next em um enumerador em um thread de background verde, portanto, chamar #next no change stream fará com que getMores seja chamado em um loop no background.
Fornece comportamento em torno de um estágio de pipeline '$changeStream' na estrutura de agregação . A especificação desse estágio permite que os usuários solicitem o envio de notificações para todas as alterações em uma coleção ou banco de dados de dados específico.
Definido sob namespace
Módulos: Repetitivo
Colapsode resumo constante
- FULL_DOCUMENT_DEFAULT =
Retorna o valor padrão da opção fullDocument.
'padrão'.congelar
- DATABASE =
Retorna Usado para indicar que o fluxo de alterações deve escutar alterações em todo o banco de dados de dados e não apenas na coleção.
:database
- CLUSTER =
Retorna Usado para indicar que o fluxo de alterações deve escutar alterações em todo o cluster em vez de apenas na coleção.
:cluster
Constantes incluídas do Loggable
Constantes incluídas de Explainable
Explicável::ALL_PLANS_EXECUTION, Explicável:: EXECUTION_STATS , Explicável::QUERY_PLANNER
Recolhimento do Resumo do atributo de instância
-
#cursor ⇒ Cursor
Somente leitura
privado
O cursor subjacente para esta operação.
-
#options ⇒ BSON::Document
Somente leitura
As opções de change stream.
Atributos incluídos em Aggregation::Behavior
Recolhimento do Resumo do método de instância
-
#close(opts = {}) ➤ nil
Feche o fluxo de alterações.
-
#fechado? ➤ verdadeiro, falso
O change stream está fechado?
-
#cursor_type ➤ Objeto
“os change streams são uma abstração em torno dos cursores tailable-awaitData...”
-
#cada {|Cada| ... } ➤ Enumerador
Iterar por meio de documentos retornados pelo fluxo de alterações.
-
#initialize(view, pipeline, change_for, options = {}) ➤ ChangeStream
construtor
Inicialize o change stream para a visualização de collection, pipeline e opções fornecidas.
-
#inspecionar ➤ string
Obtenha uma string formatada para uso na inspeção.
-
#max_await_time_ms ➤ Inteiro | nada
Retorna o valor da opção max_await_time_ms que foi passada para esse change stream.
-
#resume_token ➤ BSON::Document | nada
Retorna o token de retomada que o stream usará para ser retomado automaticamente, se houver.
-
#timeout_mode ➤ Objeto
“alterar fluxos...usar implicitamente o modo iteração ".
- #to_enum ➤ Objeto
-
#try_next ➤ BSON::Document | nada
Retorne um documento do fluxo de alterações, se houver um disponível.
Métodos incluídos no Retryable
#read_worker, #select_server, #write_worker
Métodos incluídos no Aggregation::Behavior
#allow_disk_use, #explain, #timeout_ms, #write?
Métodos incluídos no Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_WARN, #logger
Métodos incluídos no Explainable
Métodos incluídos do Iterable
Métodos incluídos do Mongo::CursorHost
Detalhes do construtor
#initialize(view, pipeline, change_for, options = {}) ➤ ChangeStream
Inicialize o change stream para a visualização de collection, pipeline e opções fornecidas.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 133 def inicializar(vista, gasoduto, change_for, = {}) # os cursores de change stream só podem ser :iterable, então não permitimos # timeout_mode a ser especificado. perform_setup(vista, , forbid: %i[ timeout_mode ]) fazer @changes_for = change_for @change_stream_filters = gasoduto && gasoduto.dup @start_after = @opções[:start_after] end # O token de retomada rastreado pelo fluxo de alterações, usado apenas # quando não houver cursor ou nenhum token de retomada do cursor @resume_token = @start_after || @opções[:resume_after] create_cursor! # Enviamos parâmetros diferentes quando retomamos um change stream # em comparação com quando enviamos a primeira query @resuming = true end |
Detalhes do atributo da instância
#cursor ➤ Cursor (somente leitura)
Este método faz parte de uma API privada. Evite usar esse método, se possível, pois ele pode ser removido ou alterado no futuro.
Gera o cursor subjacente para esta operação.
67 68 69 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 67 def cursor @cursor end |
#opções ➤ BSON::Document (somente leitura)
Retorna as opções de change stream.
63 64 65 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 63 def @opções end |
Detalhes do método de instância
#close(opts = {}) ➤ nil
Este método tenta fechar o cursor usado pelo change stream, que, por sua vez, fecha o cursor do change stream no lado do servidor. Esse método ignora todos os erros que ocorrem ao fechar o cursor do lado do servidor.
Feche o fluxo de alterações.
254 255 256 257 258 259 260 261 262 263 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 254 def Fechar(opciona = {}) a menos que fechado? começar @cursor.Fechar(opciona) salvar Erro::Falha de operação::família, Erro::Erro de soquete, Erro::SocketTimeoutError, Erro::missingconnection #ignorar end @cursor = nada end end |
#fechado? ➤ true, false
O change stream está fechado?
273 274 275 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 273 def fechado? @cursor.nada? end |
#cursor_type ➤ Objeto
“os change streams são uma abstração em torno dos cursores tailable-awaitData...”
307 308 309 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 307 def cursor_type :tailable_await end |
#cada {|Cada| ... } ➤ Enumerador
Iterar por meio de documentos retornados pelo fluxo de alterações.
Esse método tenta novamente uma vez por erro em erros retomáveis (dois erros consecutivos resultam no segundo erro sendo gerado, um erro que é recuperado de redefine a contagem de erros para zero).
169 170 171 172 173 174 175 176 177 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 169 def cada aumentar StopIteration.Novo se fechado? loop fazer documento = try_next rendimento documento se documento end salvar StopIteration Método auto end |
#inspecionar ➤ string
Obtenha uma string formatada para uso na inspeção.
285 286 287 288 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 285 def inspecionar "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filter=#{@change_stream_filters} " + "options=#{@options} resume_token=#{resume_token}>" end |
#max_await_time_ms ➤ Inteiro | nada
Retorna o valor da opção max_await_time_ms que foi passada para esse change stream.
322 323 324 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 322 def max_await_time_ms [:max_await_time_ms] end |
#resume_token ➤ BSON::Document | nada
Retorna o token de retomada que o stream usará para ser retomado automaticamente, se houver.
299 300 301 302 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 299 def resume_token cursor_resume_token = @cursor.resume_token se @cursor cursor_resume_token || @resume_token end |
#timeout_mode ➤ Objeto
"alterar fluxos...implicitamente use o modo ITERATION "
314 315 316 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 314 def timeout_mode :iteration end |
#to_enum ➤ Objeto
227 228 229 230 231 232 233 234 235 236 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 227 def to_enum enum = super enum.enviar(:instance_variable_set, '@obj', auto) classe << enum def try_next @obj.try_next end end enum end |
#try_next ➤ BSON::Document | nada
Retorne um documento do fluxo de alterações, se houver um disponível.
Tenta novamente uma vez em um erro retomável.
Gera StopIteration se o change stream estiver fechado.
Este método aguardará até max_await_time_ms milissegundos por alterações do servidor e, se nenhuma alteração for recebida, retornará nulo.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/mongo/collection/view/change_stream.rb', linha 191 def try_next recriar_cursor! se @timed_out aumentar StopIteration.Novo se fechado? começar doc = @cursor.try_next salvar mongo::Erro => e # "Se a próxima chamada falhar com um erro de tempo limite, os drivers NÃO DEVEM # invalidar o fluxo de alterações. A próxima chamada subsequente DEVE # realize uma tentativa de retomada para estabelecer um novo fluxo de alterações no servidor... " # # No entanto, os SocketTimeoutErrors são TimeoutErrors, mas também são # change-stream-resumable. Para preservar o comportamento existente (especificado), # Só contamos os tempos limite quando o erro também não é # change-stream-resumable. @timed_out = e.is_a?(mongo::Erro::Erro de tempo limite) && !e.change_stream_resumable? aumentar a menos que @timed_out || e.change_stream_resumable? @resume_token = @cursor.resume_token aumentar e se @timed_out recriar_cursor!(@cursor.Contexto) tentar novamente end # Precisamos verificar se cada documento tem um _id, então podemos # tenha um token de currículo para trabalhar se doc && doc['_id'].nada? aumentar Erro::missingResumeToken end doc end |