Fluxos de alterações
Nesta página
A partir da versão 3.6 do MongoDB Server, um novo estágio de pipeline $changeStream
é suportado no framework de aggregation. Especificar esse estágio primeiro em um pipeline de agregação permite que os usuários solicitem o envio de notificações para todas as alterações em uma collection específica. A partir do MongoDB 4.0, o change stream é compatível com reconhecimento de data center e clusters, além de collection.
O driver Ruby fornece uma API para receber notificações para alterações em uma collection, reconhecimento de data center ou cluster específico usando esse novo estágio do pipeline. Embora seja possível criar um change stream usando diretamente o operador de pipeline e a framework de aggregation, é recomendável usar a API do driver descrita abaixo, pois o driver retoma o change stream uma vez se houver um tempo limite, um erro de rede, um erro de servidor indicando que está ocorrendo um failover ou outro tipo de erro que pode ser retomado.
O change stream no servidor exige uma referência de leitura "majority"
ou nenhuma referência de leitura.
Os fluxos de mudança não funcionam corretamente com o JRuby devido ao problema documentado aqui. Ou seja, JRuby avalia cuidadosamente #next
em um enumerador em um thread de background verde; portanto, chamar #next
no change stream fará com que getMores seja chamado em um loop no background.
Observando alterações em uma collection
Um fluxo de alteração de coleção é criado chamando o método #watch
em uma coleção:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') collection = client[:test] stream = collection.watch collection.insert_one(a: 1) doc = stream.to_enum.next process(doc)
Você também pode receber as notificações à medida que elas ficam disponíveis:
stream = collection.watch enum = stream.to_enum while doc = enum.next process(doc) end
O método next
bloqueia e pesquisa o cluster até que uma alteração esteja disponível. Use o método try_next
para iterar um change stream sem bloquear; este método aguardará até max_await_time_ms milissegundos por alterações do servidor e, se nenhuma alteração for recebida, retornará nulo. Se houver um erro não retomável, next
e try_next
gerarão uma exceção. Consulte a seção Retomando um fluxo de alterações abaixo para obter um exemplo que lê as alterações de uma coleção indefinidamente.
O change stream pode usar filtros no formato de operador de pipeline de estrutura de aggregation:
stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } ]) enum = stream.to_enum while doc = enum.next process(doc) end
Observando alterações em um banco de dados
Um fluxo de alterações de banco de dados notifica alterações em qualquer coleção dentro do banco de dados, bem como eventos em todo o banco de dados, como o banco de dados sendo descartado.
Um change stream do banco de dados é criado chamando o método #watch
em um objeto do banco de dados:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') database = client.database stream = database.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
Observando alterações em um cluster
Um change stream de cluster notifica alterações em qualquer collection, qualquer reconhecimento de data center dentro do cluster, bem como evento em todo o cluster.
Um change stream de cluster é criado chamando o método #watch
em um objeto de cliente (não o objeto de cluster):
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') stream = client.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
Fechando um change stream
Você pode fechar um change stream chamando seu método #close
:
stream.close
Retomando um fluxo de alterações
Um change stream consiste em dois tipos de operações: a aggregation inicial e getMore
solicitações para receber o próximo lote de alterações.
O driver tentará automaticamente cada operação getMore
uma vez em erros de rede e quando o servidor retornar um erro indicando que mudou de estado (por exemplo, ele não é mais o principal). O driver não tenta novamente a agregação inicial.
Em termos práticos, isso significa que, por exemplo:
A chamada de
collection.watch
falhará se o cluster não tiver nós disponíveis suficientes para satisfazer a preferência de leitura"majority"
.Depois que
collection.watch
retornar com êxito, se o cluster sofrer uma eleição posteriormente ou perder um nó, mas se recuperar com rapidez suficiente, as leituras de change stream por meio dos métodosnext
oueach
continuarão de forma transparente para a aplicação.
Para observar de forma indefinida e confiável as alterações sem perder as alterações ou processar uma alteração mais de uma vez, o aplicativo deve rastrear o token de retomada do change stream e reiniciar o change stream quando ele passar por condições de erro estendidas que fazem com que a retomada automática do driver também falhe . O trecho de código a seguir mostra um exemplo de iteração de um change stream indefinidamente, recuperando o token de retomada usando o método change stream resume_token
e reiniciando o change stream usando a opção :resume_after
em todos os erros do MongoDB ou de rede:
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum while doc = enum.next process(doc) token = stream.resume_token end rescue Mongo::Error sleep 1 end end
A iteração acima está bloqueando na chamada enum.next
e não permite retomar o processamento caso o processo Ruby que executa este código seja encerrado. O driver também fornece o método try_next
que retorna nil
(após um pequeno período de espera) em vez de bloquear indefinidamente quando não há alterações no change stream. Usando o método try_next
, o token de retomada pode ser mantido após cada solicitação getMore
, mesmo quando uma solicitação específica não retorna nenhuma alteração, de modo que o token de retomada permaneça na parte superior do oplog e o aplicativo tenha a oportunidade de persista caso o processo de tratamento de alterações termine:
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum doc = enum.try_next if doc process(doc) end token = stream.resume_token # Persist +token+ to support resuming processing upon process restart rescue Mongo::Error sleep 1 end end
Observe que o token de retomada deve ser recuperado do change stream após cada chamada try_next
, mesmo que a chamada não tenha gerado nenhum documento.
O token de retomada também é fornecido no campo _id
de cada documento do change stream. A leitura do campo _id
não é recomendada porque ele pode ser projetado pelo aplicativo e porque o uso apenas do campo _id
não avançaria o token de retomada quando um getMore
não retornar documentos.