Fique atento às mudanças
Você pode acompanhar as alterações nos dados do MongoDB, como alterações em uma coleção, banco de dados ou sistema, abrindo um change stream. Um change stream permite que os aplicativos observem as alterações nos dados e reajam a elas.
O change stream gera documentos de evento de alteração quando ocorrem alterações. Um evento de alteração contém informações sobre os dados atualizados.
Abra um change stream chamando o método watch()
em um objeto MongoCollection
, MongoDatabase
ou MongoClient
como demonstrado no seguinte exemplo de código:
ChangeStreamIterable<Document> changeStream = database.watch();
Opcionalmente, o método watch()
usa uma aggregation pipeline pipeline que consiste em uma array de estágios como o primeiro parâmetro para filtrar e transformar a saída do evento de alteração da seguinte forma:
List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
O método watch()
retorna uma instância do ChangeStreamIterable
, uma classe que oferece vários métodos para acessar, organizar e percorrer os resultados. O ChangeStreamIterable
também herda métodos de sua classe pai, MongoIterable
que implementa o núcleo da interface Java Iterable
.
Você pode chamar forEach()
no ChangeStreamIterable
para administrar eventos à medida que eles ocorrem, ou você pode usar o método iterator()
para gerar uma instância MongoCursor
que pode ser usada para cruzar os resultados.
Você pode chamar métodos no MongoCursor
, como hasNext()
, para conferir se existem resultados adicionais, next()
para gerar o próximo documento na collection ou tryNext()
para gerar imediatamente o próximo elemento disponível no change stream ou null
. Ao contrário do MongoCursor
gerado por outras queries, um MongoCursor
associado a um change stream aguarda a chegada de um evento de alteração antes de gerar um resultado de next()
. Como resultado, chamadas para next()
usando o MongoCursor
de um change stream nunca lançará um java.util.NoSuchElementException
.
Para configurar as opções de processamento dos documentos gerados do change stream, use os métodos de nó do objeto ChangeStreamIterable
gerado por watch()
. Acesse o link para a documentação da API do ChangeStreamIterable
na parte inferior deste exemplo para obter mais informações sobre os métodos disponíveis.
Como processar eventos do change stream com uma chamada de resposta
Para capturar eventos de um change stream, chame o método forEach()
com uma função de chamada de resposta, conforme mostrado abaixo:
changeStream.forEach(event -> System.out.println("Change observed: " + event));
A função de chamada de resposta é acionada se um evento de alteração é emitido. Você pode especificar a lógica na chamada de resposta para processar o documento do evento quando ele for recebido.
Importante
forEach() bloqueia o thread atual
As chamadas para forEach()
bloqueiam o thread atual, desde que o change stream correspondente escute os eventos. Se o programa precisar continuar executando outra lógica, como processar solicitações ou responder à entrada do usuário, considere criar e ouvir seu change stream em um thread separado.
Observação
Para eventos de alteração de operação de atualização, os fluxos de alteração retornam apenas os campos modificados por padrão, em vez de todo o documento atualizado. Você pode configurar seu change stream para retornar também a versão mais atual do documento, chamando o método-membro fullDocument()
do objeto ChangeStreamIterable
com o valor FullDocument.UPDATE_LOOKUP
da seguinte forma:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Exemplo
O exemplo a seguir usa dois aplicativos separados para demonstrar como identificar alterações usando um change stream:
O primeiro aplicativo, denominado
Watch
, abre um change stream na collectionmovies
no banco de dadossample_mflix
.Watch
usa um aggregation pipeline para filtrar as alterações com base emoperationType
para que ele receba apenas eventos de inserção e atualização (as exclusões são excluídas por omissão).Watch
usa um chamada de resposta para receber e imprimir os eventos de alteração filtrados que ocorrem na collection.O segundo aplicativo, chamado
WatchCompanion
, insere um único documento na collectionmovies
no banco de dadossample_mflix
. Em seguida,WatchCompanion
atualiza o documento com um novo valor de campo. Finalmente,WatchCompanion
exclui o documento.
Primeiro, execute Watch
para abrir o change stream na collection e defina uma chamada de resposta no change stream usando o método forEach()
. Enquanto o Watch
estiver em execução, execute o WatchCompanion
para gerar eventos de alteração executando alterações na collection.
Observação
Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB , consulte oguia de conexão .
Watch
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package usage.examples; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion
:
// Performs CRUD operations to generate change events when run with the Watch application package usage.examples; import java.util.Arrays; import org.bson.Document; import org.bson.types.ObjectId; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Success! Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
Se você executar os aplicativos anteriores em sequência, deverá visualizar a saída do aplicativo Watch
que é semelhante ao seguinte. Somente as operações insert
e update
são impressas, pois o aggregation pipeline filtra a operação delete
:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
Você também deve visualizar a saída do aplicativo WatchCompanion
, que é semelhante ao seguinte:
Success! Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
Dica
Legacy API
Se você estiver usando a API herdada, consulte nossa página de perguntas frequentes para saber quais alterações devem ser feitas nesse exemplo de código.
Para obter informações adicionais sobre as classes e métodos mencionados nesta página, consulte os seguintes recursos:
Change Streams Entrada manual do servidor
Eventos de alteração Entrada manual do servidor
Pipeline de agregação Entrada manual do servidor
Estágios de agregação Entrada manual do servidor
ChangeStreamIterable Documentação da API
MongoCollection.watch() Documentação da API
MongoDatabase.watch() Documentação da API
MongoClient.watch() Documentação da API