Menu Docs
Página inicial do Docs
/ / /
Driver Ruby MongoDB
/

Fluxos de alterações

Nesta página

  • Observando alterações em uma collection
  • Observando alterações em um banco de dados
  • Observando alterações em um cluster
  • Fechando um change stream
  • Retomando um fluxo de alterações

A partir da versão 3.6 do MongoDB Server, um novo estágio de pipeline $changeStream é suportado no framework de aggregation. Especificar esse estágio primeiro em um pipeline de agregação permite que os usuários solicitem o envio de notificações para todas as alterações em uma collection específica. A partir do MongoDB 4.0, o change stream é compatível com reconhecimento de data center e clusters, além de collection.

O driver Ruby fornece uma API para receber notificações para alterações em uma collection, reconhecimento de data center ou cluster específico usando esse novo estágio do pipeline. Embora seja possível criar um change stream usando diretamente o operador de pipeline e a framework de aggregation, é recomendável usar a API do driver descrita abaixo, pois o driver retoma o change stream uma vez se houver um tempo limite, um erro de rede, um erro de servidor indicando que está ocorrendo um failover ou outro tipo de erro que pode ser retomado.

O change stream no servidor exige uma referência de leitura "majority" ou nenhuma referência de leitura.

Os fluxos de mudança não funcionam corretamente com o JRuby devido ao problema documentado aqui. Ou seja, JRuby avalia cuidadosamente #next em um enumerador em um thread de background verde; portanto, chamar #next no change stream fará com que getMores seja chamado em um loop no background.

Um fluxo de alteração de coleção é criado chamando o método #watch em uma coleção:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Você também pode receber as notificações à medida que elas ficam disponíveis:

stream = collection.watch
enum = stream.to_enum
while doc = enum.next
process(doc)
end

O método next bloqueia e pesquisa o cluster até que uma alteração esteja disponível. Use o método try_next para iterar um change stream sem bloquear; este método aguardará até max_await_time_ms milissegundos por alterações do servidor e, se nenhuma alteração for recebida, retornará nulo. Se houver um erro não retomável, next e try_next gerarão uma exceção. Consulte a seção Retomando um fluxo de alterações abaixo para obter um exemplo que lê as alterações de uma coleção indefinidamente.

O change stream pode usar filtros no formato de operador de pipeline de estrutura de aggregation:

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
{'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
])
enum = stream.to_enum
while doc = enum.next
process(doc)
end

Um fluxo de alterações de banco de dados notifica alterações em qualquer coleção dentro do banco de dados, bem como eventos em todo o banco de dados, como o banco de dados sendo descartado.

Um change stream do banco de dados é criado chamando o método #watch em um objeto do banco de dados:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Um change stream de cluster notifica alterações em qualquer collection, qualquer reconhecimento de data center dentro do cluster, bem como evento em todo o cluster.

Um change stream de cluster é criado chamando o método #watch em um objeto de cliente (não o objeto de cluster):

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Você pode fechar um change stream chamando seu método #close :

stream.close

Um change stream consiste em dois tipos de operações: a aggregation inicial e getMore solicitações para receber o próximo lote de alterações.

O driver tentará automaticamente cada operação getMore uma vez em erros de rede e quando o servidor retornar um erro indicando que mudou de estado (por exemplo, ele não é mais o principal). O driver não tenta novamente a agregação inicial.

Em termos práticos, isso significa que, por exemplo:

  • A chamada de collection.watch falhará se o cluster não tiver nós disponíveis suficientes para satisfazer a preferência de leitura "majority" .

  • Depois que collection.watch retornar com êxito, se o cluster sofrer uma eleição posteriormente ou perder um nó, mas se recuperar com rapidez suficiente, as leituras de change stream por meio dos métodos next ou each continuarão de forma transparente para a aplicação.

Para observar de forma indefinida e confiável as alterações sem perder as alterações ou processar uma alteração mais de uma vez, o aplicativo deve rastrear o token de retomada do change stream e reiniciar o change stream quando ele passar por condições de erro estendidas que fazem com que a retomada automática do driver também falhe . O trecho de código a seguir mostra um exemplo de iteração de um change stream indefinidamente, recuperando o token de retomada usando o método change stream resume_token e reiniciando o change stream usando a opção :resume_after em todos os erros do MongoDB ou de rede:

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
while doc = enum.next
process(doc)
token = stream.resume_token
end
rescue Mongo::Error
sleep 1
end
end

A iteração acima está bloqueando na chamada enum.next e não permite retomar o processamento caso o processo Ruby que executa este código seja encerrado. O driver também fornece o método try_next que retorna nil (após um pequeno período de espera) em vez de bloquear indefinidamente quando não há alterações no change stream. Usando o método try_next , o token de retomada pode ser mantido após cada solicitação getMore , mesmo quando uma solicitação específica não retorna nenhuma alteração, de modo que o token de retomada permaneça na parte superior do oplog e o aplicativo tenha a oportunidade de persista caso o processo de tratamento de alterações termine:

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
doc = enum.try_next
if doc
process(doc)
end
token = stream.resume_token
# Persist +token+ to support resuming processing upon process restart
rescue Mongo::Error
sleep 1
end
end

Observe que o token de retomada deve ser recuperado do change stream após cada chamada try_next , mesmo que a chamada não tenha gerado nenhum documento.

O token de retomada também é fornecido no campo _id de cada documento do change stream. A leitura do campo _id não é recomendada porque ele pode ser projetado pelo aplicativo e porque o uso apenas do campo _id não avançaria o token de retomada quando um getMore não retornar documentos.

Voltar

GridFS