Monitorar alterações de dados
Nesta página
Visão geral
Neste guia, você pode aprender como usar um fluxo de alterações para monitorar alterações em tempo real em seus dados. Um change stream é uma funcionalidade do MongoDB Server que permite que seu aplicação se inscreva em alterações de dados em uma collection, banco de dados de dados ou sistema.
Ao utilizar o driver Scala, você pode chamar o método watch()
para retornar uma instância do ChangeStreamObservable
. Em seguida, você pode assinar a instância do ChangeStreamObservable
para ver novas alterações de dados, como atualizações, inserções e exclusões.
Dados de amostra
Os exemplos neste guia utilizam a restaurants
coleção do sample_restaurants
no banco de dados do a partir dos conjuntos de dados de amostra do Atlas . Para acessar essa collection a partir do seu aplicação Scala, crie um MongoClient
que se conecte a um Atlas cluster e atribua os seguintes valores às suas variáveis database
collection
e:
val database: MongoDatabase = client.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Dica
Para saber como criar um cluster MongoDB Atlas gratuito e carregar os conjuntos de dados de amostra, consulte o guia Iniciar com Atlas .
Alguns exemplos usam instâncias da classe LatchedObserver
para lidar com eventos de fluxo de alterações. Essa classe é um observador personalizado que imprime eventos de fluxo de alteração e continua monitorando as alterações de dados até que o fluxo seja concluído ou gerado um erro. Para utilizar a classe LatchedObserver
, cole o seguinte código em seu arquivo de aplicação :
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() }
Abrir um fluxo de alterações
Para abrir um fluxo de alteração, chame o método watch()
. A instância na qual você chama o método watch()
determina o escopo de eventos que o change stream monitora. Você pode chamar o método watch()
em instâncias das seguintes classes:
MongoClient
: monitora alterações em todas as collections em todos os bancos de dados em um sistema, excluindo as collections do sistema ou as collections nosadmin
local
config
bancos de dados, eMongoDatabase
: Monitora alterações em todas as coleções em um banco de dadosMongoCollection
: monitora alterações em uma coleção
O exemplo a seguir chama o método watch()
para abrir um fluxo de alteração na coleção restaurants
. O código cria uma instância do LatchedObserver
para receber e produzir alterações conforme ocorrem:
val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await()
Para começar a observar as alterações, execute o código anterior. Em seguida, em uma shell separada, execute o seguinte código para atualizar um documento que tenha um valor de campo name
de "Blarney Castle"
:
val filter = equal("name", "Blarney Castle") val update = set("cuisine", "American") collection.updateOne(filter, update) .subscribe((res: UpdateResult) => println(res), (e: Throwable) => println(s"There was an error: $e"))
Quando você executa o código anterior para atualizar a coleção, o aplicação de fluxo de alterações imprime a alteração conforme ela ocorre. O evento de alteração impresso se assemelha à seguinte saída:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Modificar a saída change stream
Para modificar a saída do change stream, passe uma lista de estágios do pipeline como parâmetro para o método watch()
. Você pode incluir os seguintes estágios na lista:
$addFields
ou$set
: adiciona novos campos aos documentos$match
: filtra os documentos$project
: projeta um subconjunto dos campos do documento$replaceWith
ou$replaceRoot
: substitui o documento de entrada pelo documento especificado$redact
: restringe o conteúdo dos documentos$unset
: remove campos de documentos
O driver Scala fornece a classe Aggregates
, que inclui métodos de assistente para criar os estágios anteriores do pipeline.
Dica
Para saber mais sobre os estágios do pipeline e seus métodos assistente Aggregates
correspondentes, consulte os seguintes recursos:
Estágios de agregação no manual do MongoDB Server
Agrega na documentação da API
O exemplo a seguir cria um pipeline que usa o método Aggregates.filter()
para construir o estágio $match
. Em seguida, o código passa esse pipeline para o método watch()
e instrui watch()
a gerar eventos somente quando ocorrerem operações de atualização:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update")))) observer.await()
Modificar comportamento do watch()
Você pode modificar o comportamento do método watch()
encadeando métodos fornecidos pela classe ChangeStreamObservable
. A tabela a seguir descreve alguns desses métodos:
Método | Descrição |
---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Attaches a comment to the operation. |
| Instructs the change stream to provide only changes that occurred at or after
the specified timestamp. |
| Sets the collation to use for the change stream cursor. |
Para obter uma lista completa das watch()
opções, consulte ChangeStreamObservable na documentação da API.
Incluir pré-imagens e pós-imagens
Importante
Você pode habilitar pré-imagens e pós-imagens em collections somente se seu sistema usar o MongoDB Server v6.0 ou posterior.
Por padrão, quando você executa uma operação em uma collection, o evento de alteração correspondente inclui somente os campos modificados e seus valores antes e depois da operação.
Você pode instruir o watch()
método a retornar a pré-imagem do documento , a versão completa do documento antes das alterações, além dos campos modificados. Para incluir a pré-imagem no change evento, encadeie o fullDocumentBeforeChange()
método watch()
a. Passe um dos seguintes valores para o fullDocumentBeforeChange()
método :
FullDocumentBeforeChange.WHEN_AVAILABLE
: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração. Se a pré-imagem não estiver disponível, esse campo de evento de alteração terá um valornull
.FullDocumentBeforeChange.REQUIRED
: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração. Se a pré-imagem não estiver disponível, o servidor gerará um erro.
Você também pode instruir o watch()
método a retornar a pós-imagem do documento , a versão completa do documento após as alterações, além dos campos modificados. Para incluir a pós-imagem no evento de fluxo de alterações, encadeie o fullDocument()
método watch()
a. Passe um dos seguintes valores para o fullDocument()
método :
FullDocument.UPDATE_LOOKUP
: o evento de alteração inclui uma cópia de todo o documento alterado de algum tempo após a alteração.FullDocument.WHEN_AVAILABLE
: o evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração. Se a pós-imagem não estiver disponível, esse campo de evento de alteração terá um valornull
.FullDocument.REQUIRED
: o evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração. Se a pós-imagem não estiver disponível, o servidor gerará um erro.
O exemplo a seguir chama o método watch()
em uma coleção e inclui a pós-imagem de documentos atualizados encadeando o método fullDocument()
:
val observer = LatchedObserver() collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .subscribe(observer) observer.await()
Com o aplicação de fluxo de alterações em execução em um shell separado, atualizar um documento na restaurants
coleção usando o exemplo de atualização anterior imprime um evento de alteração que se assemelha à seguinte saída:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24", "coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard", "zipcode": "11697"}), (borough,BsonString{value='Queens'}), (cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}), (name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}), (blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription= UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Dica
Para saber mais sobre pré e pós-imagens, consulte Change Streams com pré e pós-imagens de documentos no manual do MongoDB Server .
Informações adicionais
Para saber mais sobre fluxos de alterações, consulte Change Streams de alterações no manual do MongoDB Server .
Documentação da API
Para saber mais sobre qualquer um dos métodos ou tipos discutidos neste guia, consulte a seguinte documentação da API: