Menu Docs
Página inicial do Docs
/ / /
Java síncrono
/

Fique atento às mudanças

Você pode acompanhar as alterações nos dados do MongoDB, como alterações em uma coleção, banco de dados ou sistema, abrindo um change stream. Um change stream permite que os aplicativos observem as alterações nos dados e reajam a elas.

O change stream gera documentos de evento de alteração quando ocorrem alterações. Um evento de alteração contém informações sobre os dados atualizados.

Abra um change stream chamando o método watch() em um objeto MongoCollection, MongoDatabase ou MongoClient como demonstrado no seguinte exemplo de código:

ChangeStreamIterable<Document> changeStream = database.watch();

Opcionalmente, o método watch() usa uma aggregation pipeline pipeline que consiste em uma array de estágios como o primeiro parâmetro para filtrar e transformar a saída do evento de alteração da seguinte forma:

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

O método watch() retorna uma instância do ChangeStreamIterable, uma classe que oferece vários métodos para acessar, organizar e percorrer os resultados. O ChangeStreamIterable também herda métodos de sua classe pai, MongoIterable que implementa o núcleo da interface Java Iterable.

Você pode chamar forEach() no ChangeStreamIterable para administrar eventos à medida que eles ocorrem, ou você pode usar o método iterator() para gerar uma instância MongoCursor que pode ser usada para cruzar os resultados.

Você pode chamar métodos no MongoCursor, como hasNext(), para conferir se existem resultados adicionais, next() para gerar o próximo documento na collection ou tryNext() para gerar imediatamente o próximo elemento disponível no change stream ou null. Ao contrário do MongoCursor gerado por outras queries, um MongoCursor associado a um change stream aguarda a chegada de um evento de alteração antes de gerar um resultado de next(). Como resultado, chamadas para next() usando o MongoCursor de um change stream nunca lançará um java.util.NoSuchElementException.

Para configurar as opções de processamento dos documentos gerados do change stream, use os métodos de nó do objeto ChangeStreamIterable gerado por watch(). Acesse o link para a documentação da API do ChangeStreamIterable na parte inferior deste exemplo para obter mais informações sobre os métodos disponíveis.

Para capturar eventos de um change stream, chame o método forEach() com uma função de chamada de resposta, conforme mostrado abaixo:

changeStream.forEach(event -> System.out.println("Change observed: " + event));

A função de chamada de resposta é acionada se um evento de alteração é emitido. Você pode especificar a lógica na chamada de resposta para processar o documento do evento quando ele for recebido.

Importante

forEach() bloqueia o thread atual

As chamadas para forEach() bloqueiam o thread atual, desde que o change stream correspondente escute os eventos. Se o programa precisar continuar executando outra lógica, como processar solicitações ou responder à entrada do usuário, considere criar e ouvir seu change stream em um thread separado.

Observação

Para eventos de alteração de operação de atualização, os fluxos de alteração retornam apenas os campos modificados por padrão, em vez de todo o documento atualizado. Você pode configurar seu change stream para retornar também a versão mais atual do documento, chamando o método-membro fullDocument() do objeto ChangeStreamIterable com o valor FullDocument.UPDATE_LOOKUP da seguinte forma:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

O exemplo a seguir usa dois aplicativos separados para demonstrar como identificar alterações usando um change stream:

  • O primeiro aplicativo, denominado Watch, abre um change stream na collection movies no banco de dados sample_mflix. Watch usa um aggregation pipeline para filtrar as alterações com base em operationType para que ele receba apenas eventos de inserção e atualização (as exclusões são excluídas por omissão). Watch usa um chamada de resposta para receber e imprimir os eventos de alteração filtrados que ocorrem na collection.

  • O segundo aplicativo, chamado WatchCompanion, insere um único documento na collection movies no banco de dados sample_mflix. Em seguida, WatchCompanion atualiza o documento com um novo valor de campo. Finalmente, WatchCompanion exclui o documento.

Primeiro, execute Watch para abrir o change stream na collection e defina uma chamada de resposta no change stream usando o método forEach(). Enquanto o Watch estiver em execução, execute o WatchCompanion para gerar eventos de alteração executando alterações na collection.

Observação

Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB , consulte oguia de conexão .

Watch:

package usage.examples;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// variables referenced in a lambda must be final; final array gives us a mutable integer
final int[] numberOfEvents = {0};
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

Se você executar os aplicativos anteriores em sequência, deverá visualizar a saída do aplicativo Watch que é semelhante ao seguinte. Somente as operações insert e update são impressas, pois o aggregation pipeline filtra a operação delete:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

Você também deve visualizar a saída do aplicativo WatchCompanion, que é semelhante ao seguinte:

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Dica

Legacy API

Se você estiver usando a API herdada, consulte nossa página de perguntas frequentes para saber quais alterações devem ser feitas nesse exemplo de código.

Para obter informações adicionais sobre as classes e métodos mencionados nesta página, consulte os seguintes recursos:

Voltar

Realizar operações em massa