Menu Docs

Fluxos de change streams

Neste guia, você pode aprender como usar um change stream para monitorar alterações em tempo real em seu reconhecimento de data center. Um change stream é um recurso do MongoDB Server que permite que sua aplicação assine alterações de dados em uma única collection, reconhecimento de data center ou sistema.

Você pode especificar um conjunto de operadores de agregação para filtrar e transformar os dados que seu aplicativo recebe. Ao se conectar a um sistema MongoDB v6.0 ou posterior, você também pode configurar os eventos para incluir os dados do documento antes e depois da alteração.

Saiba como abrir e configurar seu change stream nas seguintes seções:

Você pode abrir um fluxo de alterações para assinar tipos específicos de alterações de dados e produzir eventos de alteração em seu aplicativo.

Para abrir um change stream, chame o método watch() em uma instância de um MongoCollection, MongoDatabase ou MongoClient.

Importante

Sistemas standalone do MongoDB não oferecem suporte a change streams porque o recurso exige um oplog de conjunto de réplicas. Para saber mais sobre o oplog, consulte a página de manual do Replica Set oplog MongoDB Server .

O objeto no qual você chama o método watch() determina o escopo de eventos que o change stream escuta:

  • MongoCollection.watch() monitora uma collection.

  • MongoDatabase.watch() monitora todas as coleções em um banco de dados.

  • MongoClient.watch() monitora todas as alterações no MongoDB deployment conectado.

O método watch() usa um pipeline de agregação opcional como o primeiro parâmetro, que consiste em uma lista de estágios que podem ser usados para filtrar e transformar a saída do evento de alteração, conforme segue:

List<Bson> pipeline = List.of(
Aggregates.match(
Filters.in("operationType",
List.of("insert", "update"))),
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

Observação

Para eventos de alteração de operação de atualização, os fluxos de alteração retornam apenas os campos modificados por padrão, em vez de todo o documento atualizado. Você pode configurar seu change stream para retornar também a versão mais atual do documento, chamando o método-membro fullDocument() do objeto ChangeStreamIterable com o valor FullDocument.UPDATE_LOOKUP da seguinte forma:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

O método watch() retorna uma instância de ChangeStreamIterable, uma interface que oferece vários métodos para acessar, organizar e percorrer os resultados. O ChangeStreamIterable também herda métodos de sua interface pai, MongoIterable que implementa o núcleo da interface Java Iterable.

Você pode chamar forEach() no ChangeStreamIterable para administrar eventos à medida que eles ocorrem, ou você pode usar o método iterator() para gerar uma instância MongoChangeStreamCursor que pode ser usada para cruzar os resultados.

Você pode chamar os seguintes métodos em uma instância do MongoChangeStreamCursor :

  • hasNext(): Verifica se há mais resultados

  • next(): retorna o próximo documento na coleção

  • tryNext(): Retorna imediatamente o próximo elemento disponível no fluxo de alterações ou null

Importante

A iteração do cursor bloqueia o thread atual

A iteração por meio de um cursor usando forEach() ou qualquer método iterator() bloqueia o thread atual enquanto o change stream correspondente escuta eventos. Se o programa precisar continuar executando outra lógica, como processar solicitações ou responder à entrada do usuário, considere criar e ouvir seu change stream em um thread separado.

Ao contrário do MongoCursor gerado por outras queries, um MongoChangeStreamCursor associado a um change stream aguarda a chegada de um evento de alteração antes de gerar um resultado de next(). Como resultado, as chamadas para next() usando o MongoChangeStreamCursor de um change stream nunca lançam um java.util.NoSuchElementException.

Para configurar as opções de processamento dos documentos gerados do change stream, use os métodos de nó do objeto ChangeStreamIterable gerado por watch(). Acesse o link para a documentação da API do ChangeStreamIterable na parte inferior deste exemplo para obter mais informações sobre os métodos disponíveis.

Este exemplo mostra como abrir um change stream na collection myColl e imprimir eventos do change stream conforme eles ocorrem.

O driver armazena evento de change stream em uma variável do tipo ChangeStreamIterable. No exemplo a seguir, especificamos que o driver deve preencher o objeto ChangeStreamIterable com tipos Document . Como resultado, o driver armazena evento individuais de change stream como objeto ChangeStreamDocument .

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

Uma operação de inserção na collection produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

Observação

Exemplo de configuração

Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB, consulte o guia Criar um MongoClient. Este exemplo também utiliza a coleção do movies no banco de dados do sample_mflix incluído nos conjuntos de dados de amostra do Atlas. Você pode carregá-los em seu banco de dados na camada grátis do MongoDB Atlas seguindo o Guia de Introdução ao Atlas.

Este exemplo demonstra como abrir um fluxo de alteração usando o método watch. O arquivo Watch.java chama o método watch() com um pipeline como argumento para filtrar somente os eventos "insert" e "update". O arquivo WatchCompanion.java insere, atualiza e exclui um documento.

Para usar os exemplos a seguir, execute os arquivos nesta ordem:

  1. Execute o arquivo Watch.java.

  2. Execute o arquivo WatchCompanion.java.

Observação

O arquivo Watch.java continuará em execução até que o arquivo WatchCompanion.java seja executado.

Watch.java:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package org.example;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.Aggregates;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion.java:

// Performs CRUD operations to generate change events when run with the Watch application
package org.example;
import org.bson.Document;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.result.DeleteResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

Os aplicativos anteriores gerarão a seguinte saída:

Watch.java capturará somente as operações insert e update, pois o agregação pipeline filtra a operação delete:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

WatchCompanion imprimirá um resumo das operações concluídas:

Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Para saber mais sobre o método watch() , consulte a seguinte documentação da API:

Você pode passar um pipeline de agregação como um parâmetro para o método watch() para especificar quais eventos de alteração o fluxo de alteração recebe.

Para saber quais operadores de aggregation sua versão do MongoDB Server suporta, consulte Modificar a saída do change stream.

O exemplo de código a seguir mostra como você pode aplicar um pipeline de agregação para configurar seu change stream para receber evento de alteração apenas para operações de inserção e atualização:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

Uma operação de atualização na collection produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

A partir do MongoDB 7.0, você pode usar o estágio de agregação $changeStreamSplitLargeEvent para fazer a divisão de evento que excedem 16 MB em fragmentos menores.

Utilize o $changeStreamSplitLargeEvent somente quando estritamente necessário. For example, use $changeStreamSplitLargeEvent if your application requires full document pre- or post-images, and generates events that exceed 16 MB.

O estágio $changeStreamSplitLargeEvent retorna os fragmentos sequencialmente. Você pode acessar os fragmentos usando um cursor de change stream. Cada fragmento inclui um objeto SplitEvent contendo os seguintes campos:

Campo
Descrição

fragment

O índice do fragmento, começando em 1

of

O número total de fragmentos que compõem o evento de divisão

O exemplo a seguir modifica seu change stream usando o estágio de aggregation $changeStreamSplitLargeEvent para fazer a divisão de evento grandes:

ChangeStreamIterable<Document> changeStream = collection.watch(
List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

Observação

Você pode ter apenas um estágio $changeStreamSplitLargeEvent no seu pipeline de agregação, e ele deve ser o último estágio no pipeline.

Você pode chamar o método getSplitEvent() no cursor do change stream para acessar o SplitEvent , conforme mostrado no exemplo a seguir:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

Para obter mais informações sobre o estágio de agregação $changeStreamSplitLargeEvent , consulte a documentação do servidor $changeStreamSplitLargeEvent .

Você pode configurar o evento de alteração para conter ou omitir os seguintes dados:

  • A pré-imagem, um documento que representa a versão do documento antes da operação, se existir

  • A pós-imagem, um documento que representa a versão do documento após a operação, se existir

Importante

Você pode habilitar pré e pós-imagens em coleções somente se seu sistema usar o MongoDB v6.0 ou posterior.

Para receber evento de change stream que incluem uma pré-imagem ou pós-imagem, você deve executar a seguinte ação:

Para usar o driver para criar uma collection com as opções de pré-imagem e pós-imagem habilitadas, especifique uma instância de ChangeStreamPreAndPostImagesOptions e chame o método createCollection() , conforme mostrado no exemplo a seguir:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

Você pode alterar a opção pré-imagem e pós-imagem em uma coleção existente executando o comando collMod a partir do MongoDB Shell. Para saber como executar esta operação, consulte a entrada sobre collMod no manual do servidor MongoDB.

Aviso

Se você ativou pré-imagens ou pós-imagens em uma collection, modificar essas configurações com collMod pode fazer com que os change stream existentes nessa collection falhem.

O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl para incluir a pré-imagem e gerar quaisquer eventos de alteração:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

O exemplo anterior configura o change stream para utilizar a opção FullDocumentBeforeChange.REQUIRED . Essa opção configura o change stream para exigir pré-imagens para substituir, atualizar e excluir eventos de alteração. Se a pré-imagem não estiver disponível, o driver chamará um erro.

Suponha que você atualize o valor do campo amount em um documento de 150 para 2000. Esse evento de alteração produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

Para obter uma lista de opções, consulte o FullDocumentBeforeChange Documentação da API.

O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl para incluir a pré-imagem e gerar quaisquer eventos de alteração:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

O exemplo anterior configura o change stream para utilizar a opção FullDocument.WHEN_AVAILABLE . Essa opção configura o change stream para retornar a pós-imagem do documento para substituir e atualizar eventos de alteração, se estiver disponível.

Suponha que você atualize o valor do campo color em um documento de "purple" para "pink". O evento de alteração produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

Para obter uma lista de opções, consulte o FullDocument Documentação da API.

Para obter mais informações sobre os métodos e classes usados para gerenciar change streams, consulte a seguinte documentação da API: