Menu Docs
Página inicial do Docs
/
Manual do MongoDB
/ / /

Mongo.watch()

Nesta página

  • Definição
  • Disponibilidade
  • Implantação
  • Mecanismo de armazenamento
  • Suporte para a preocupação de leitura majority
  • Comportamento
  • Retomabilidade
  • Pesquisa completa de documentos de operações de atualização
  • Disponibilidade
  • Controle de acesso
  • Iteração do cursor
  • Exemplo
Mongo.watch( pipeline, options )

Somente para conjuntos de réplica e clusters fragmentados

Abre umcursor de fluxo de alterações para um conjunto de réplicas ou um cluster fragmentado para relatar todas as suas coleções nãosystem em seus bancos de dados, com exceção dos bancos de dados admin, local e config .

Parâmetro
Tipo
Descrição
pipeline
array

Opcional. Um pipeline de agregação que consiste em um ou mais dos seguintes estágios de agregação :

Especifique um pipeline para filtrar/modificar o resultado dos eventos de alteração.

A partir do MongoDB 4.2, os fluxos de alteração lançarão uma exceção se o pipeline de agregação do fluxo de alterações modificar o campo _id de um evento.

options
documento
Opcional. Opções adicionais que modificam o comportamento do Mongo.watch().

O documento options pode conter os seguintes campos e valores:

Campo
Tipo
Descrição
resumeAfter
documento

Opcional. Direciona Mongo.watch() para tentar retomar as notificações a partir da operação especificada no token de retomada.

Cada documento de evento de fluxo de alteração inclui um token de currículo como o campo _id. Passe todo o _id campo do documento do evento de alteração que representa a operação que você deseja retomar depois.

resumeAfter é mutuamente exclusivo com startAfter e startAtOperationTime.

startAfter
documento

Opcional. Direciona Mongo.watch() para tentar iniciar um novo fluxo de alterações após a operação especificada no token de retomada. Permite que as notificações sejam retomadas após um evento de invalidação.

Cada documento de evento de fluxo de alteração inclui um token de currículo como o campo _id. Passe todo o _id campo do documento do evento de alteração que representa a operação que você deseja retomar depois.

startAfter é mutuamente exclusivo com resumeAfter e startAtOperationTime.

fullDocument
string

Opcional. Por padrão, Mongo.watch() retorna o delta dos campos modificados por uma operação de atualização, em vez de todo o documento atualizado.

Defina fullDocument como "updateLookup" para direcionar Mongo.watch() para procurar a versão mais atual confirmada por maioria do documento atualizado. Mongo.watch() retorna um campo fullDocument com a pesquisa de documento, além do delta updateDescription.

batchSize
int

Opcional. Especifica o número máximo de eventos de alteração a serem retornados em cada lote da resposta do cluster MongoDB.

Tem a mesma funcionalidade que cursor.batchSize().

maxAwaitTimeMS
int

Opcional. A quantidade máxima de tempo em milissegundos que o servidor aguarda por novas alterações de dados para relatar ao cursor do fluxo de alterações antes de retornar um lote vazio.

Padrão para 1000 milissegundos.

collation
documento

Opcional. Passe um documento de agrupamento para especificar um agrupamento para o cursor do change stream.

Se omitido, o padrão é simple comparação binária.

startAtOperationTime
Timestamp

Opcional. O ponto de partida para o fluxo de alterações. Se o ponto de partida especificado estiver no passado, ele deverá estar no intervalo de tempo do oplog. Para verificar o intervalo de tempo do oplog, consulte rs.printReplicationInfo().

startAtOperationTime é mutuamente exclusivo com resumeAfter e startAfter.

Retorna:Um cursor sobre os documentos do evento de alteração. Consulte Eventos de mudança para obter exemplos de documentos de eventos de mudança.

Dica

Veja também:

Mongo.watch() está disponível para conjuntos de réplicas e clusters fragmentados:

  • Em um conjunto de réplicas, você pode emitir Mongo.watch() em qualquer membro portador de dados.

  • Em um cluster fragmentado, você deve emitir Mongo.watch() em uma instância mongos.

Você só pode usar Mongo.watch() com o mecanismo de armazenamento Wired Tiger.

A partir do MongoDB 4.2, os fluxos de alteração estão disponíveis independentemente do suporte à "majority" de leitura; ou seja, o suporte a majority de leitura pode ser habilitado (padrão) ou desabilitado para usar fluxos de alteração.

No MongoDB 4.0 e versões anteriores, os fluxos de alteração estarão disponíveis somente se "majority" suporte a preocupações de leitura estiver habilitado (padrão).

  • Mongo.watch() notifica apenas sobre alterações de dados que persistiram para a maioria dos membros portadores de dados.

  • O cursor do fluxo de alterações permanece aberto até que ocorra uma das seguintes situações:

    • O cursor está explicitamente fechado.

    • Ocorre um evento de invalidar; por exemplo, um drop de coleção ou renomear.

    • A conexão com a implantação do MongoDB fecha ou expira. Consulte Comportamentos do cursor para obter mais informações.

    • Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado e o cursor de fluxo de alterações fechado pode não ser totalmente retomável.

Ao contrário dos drivers do MongoDB, omongosh não tenta retomar automaticamente um cursor de fluxo de alterações após um erro. Os drivers do MongoDB fazem uma tentativa de retomar automaticamente um cursor de fluxo de alterações após determinados erros.

Mongo.watch() usa informações armazenadas no oplog para produzir a descrição do evento de alteração e gerar um token de retomada associado a essa operação. Se a operação identificada pelo token de retomada passada para a opção resumeAfter ou startAfter já tiver sido descartada do oplog, Mongo.watch() não poderá retomar o fluxo de alterações.

Consulte Retomar um fluxo de alterações para obter mais informações sobre como retomar um fluxo de alterações.

Observação

  • Não é possível usar resumeAfter para retomar um fluxo de alterações depois que um evento de invalidação (por exemplo, um descarte ou renomeação de coleção) fechar o fluxo. Em vez disso, você pode usar startAfter para iniciar um novo fluxo de alterações após um evento de invalidação.

  • Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado e o cursor de fluxo de alterações fechado pode não ser totalmente retomável.

Observação

Retomar token

O token de retomada tipo _data depende das versões do MongoDB e, em alguns casos, da versão de compatibilidade de recursos (fcv) no momento da abertura/retomada do change stream (ou seja, uma alteração no valor da fcv não afeta os tokens de retomada de change stream já abertos):

Versão do MongoDB
Versão de compatibilidade de recursos
Token de retomada tipo _data
MongoDB 4.2 e posterior
"4.2" ou "4.0"
String codificada em hexadecimal (v1)
MongoDB 4.0.7 e posterior
"4.0" ou "3.6"
String codificada em hexadecimal (v1)
MongoDB 4.0.6 e mais antiga
"4.0"
String codificada em hexadecimal (v0)
MongoDB 4.0.6 e mais antiga
"3.6"
BinData
MongoDB 3.6
"3.6"
BinData

Com tokens de retomada de string codificados em hexadecimal, é possível comparar e classificar os tokens de retomada.

Independentemente do valor da fcv, um sistema 4.0 pode usar tokens de retomada BinData ou tokens de retomada de string hexadecimais para retomar um change stream. Dessa forma, um sistema 4.0 pode usar um token de retomada de um change stream aberto em uma collection de um sistema 3.6.

Novos formatos de token de retomada introduzidos em uma versão MongoDB não podem ser consumidos por versões anteriores do MongoDB.

O MongoDB fornece um "snippet", uma extensão para mongosh, que decodifica tokens de currículo codificados por hex.

Você pode instalar e executar o trecho resumetoken do mongosh:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

Você também pode executar resumetoken a partir da linha de comando (sem usar o mongosh) se o npm estiver instalado em seu sistema:

npx mongodb-resumetoken-decoder <RESUME TOKEN>

Consulte o seguinte para obter mais detalhes sobre:

Por padrão, o cursor de fluxo de alterações retorna alterações/deltas de campos específicos para operações de atualização. Você também pode configurar o fluxo de alterações para procurar e retornar a versão atual de compromisso majoritário do documento alterado. Dependendo de outras operações de gravação que podem ter ocorrido entre a atualização e a pesquisa, o documento retornado pode diferir significativamente do documento no momento da atualização.

Dependendo do número de alterações aplicadas durante a operação de atualização e o tamanho do documento completo, há o risco de que o tamanho do documento do evento de alteração para uma operação de atualização seja maior do que os 16MB que são o limite de documentos BSON. Se isso ocorrer, o servidor fechará o cursor de fluxo de alterações e retornará um erro.

A partir do MongoDB 4.2, os fluxos de alteração estão disponíveis independentemente do suporte à "majority" de leitura; ou seja, o suporte a majority de leitura pode ser habilitado (padrão) ou desabilitado para usar fluxos de alteração.

No MongoDB 4.0 e versões anteriores, os fluxos de alteração estarão disponíveis somente se "majority" suporte a preocupações de leitura estiver habilitado (padrão).

Ao executar com controle de acesso, o usuário deve ter as ações de privilégio find e changeStream em todas as collections que não são do sistema em todos os bancos de dados. Ou seja, um usuário deve ter uma role que conceda o seguinte privilégio:

{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

O papel do read embutido fornece os privilégios apropriados.

O MongoDB oferece várias maneiras de iterar em um cursor.

O método cursor.hasNext() bloqueia e aguarda o próximo evento. Para monitorar o cursor watchCursor e iterar sobre os eventos, use hasNext() assim:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

O método cursor.tryNext() não está bloqueando. Para monitorar o cursor watchCursor e iterar sobre os eventos, use tryNext() assim:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

A operação a seguir no mongosh abre um cursor de fluxo de alterações em um conjunto de réplicas. O cursor retornado relata as alterações de dados em todas as coleções que não sejam dosystem em todos os bancos de dados, exceto os bancos de dados admin, local e config.

watchCursor = db.getMongo().watch()

Itere o cursor para verificar novos eventos. Use o método cursor.isClosed() com o método cursor.tryNext() para garantir que o loop só saia se o cursor do fluxo de alteração estiver fechado e não houver objetos restantes no lote mais recente:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

Para obter a documentação completa sobre a saída do fluxo de alterações, consulte Eventos de alteração.

Observação

Você não pode usar isExhausted() com alterar colunas.

Voltar

Mongo.setWriteConcern