Fluxos de change streams
Nesta página
- Visão geral
- Abrir um fluxo de alterações
- Selecione um escopo para assistir
- Filtrar os eventos
- Gerencie a saída
- Exemplo
- Exemplo de exibição: arquivos completos
- Saída de exemplo de arquivo completo
- Aplique operadores de agregação ao seu change stream
- Exemplo
- Dividir eventos de fluxo de grandes mudanças
- Incluir pré-imagens e pós-imagens
- Crie uma coleção com pré-imagem e pós-imagens habilitadas
- Exemplo de configuração de pré-imagem
- Exemplo de configuração pós-imagem
- Informações adicionais
- Documentação da API
- Entradas manuais do servidor
Visão geral
Neste guia, você pode aprender como usar um change stream para monitorar alterações em tempo real em seu reconhecimento de data center. Um change stream é um recurso do MongoDB Server que permite que sua aplicação assine alterações de dados em uma única collection, reconhecimento de data center ou sistema.
Você pode especificar um conjunto de operadores de agregação para filtrar e transformar os dados que seu aplicativo recebe. Ao se conectar a um sistema MongoDB v6.0 ou posterior, você também pode configurar os eventos para incluir os dados do documento antes e depois da alteração.
Saiba como abrir e configurar seu change stream nas seguintes seções:
Abrir um fluxo de alterações
Você pode abrir um fluxo de alterações para assinar tipos específicos de alterações de dados e produzir eventos de alteração em seu aplicativo.
Selecione um escopo para assistir
Para abrir um change stream, chame o método watch()
em uma instância de um MongoCollection
, MongoDatabase
ou MongoClient
.
Importante
Sistemas standalone do MongoDB não oferecem suporte a change streams porque o recurso exige um oplog de conjunto de réplicas. Para saber mais sobre o oplog, consulte a página de manual do Replica Set oplog MongoDB Server .
O objeto no qual você chama o método watch()
determina o escopo de eventos que o change stream escuta:
MongoCollection.watch()
monitora uma collection.MongoDatabase.watch()
monitora todas as coleções em um banco de dados.MongoClient.watch()
monitora todas as alterações no MongoDB deployment conectado.
Filtrar os eventos
O método watch()
usa um pipeline de agregação opcional como o primeiro parâmetro, que consiste em uma lista de estágios que podem ser usados para filtrar e transformar a saída do evento de alteração, conforme segue:
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
Observação
Para eventos de alteração de operação de atualização, os fluxos de alteração retornam apenas os campos modificados por padrão, em vez de todo o documento atualizado. Você pode configurar seu change stream para retornar também a versão mais atual do documento, chamando o método-membro fullDocument()
do objeto ChangeStreamIterable
com o valor FullDocument.UPDATE_LOOKUP
da seguinte forma:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Gerencie a saída
O método watch()
retorna uma instância de ChangeStreamIterable
, uma interface que oferece vários métodos para acessar, organizar e percorrer os resultados. O ChangeStreamIterable
também herda métodos de sua interface pai, MongoIterable
que implementa o núcleo da interface Java Iterable
.
Você pode chamar forEach()
no ChangeStreamIterable
para administrar eventos à medida que eles ocorrem, ou você pode usar o método iterator()
para gerar uma instância MongoChangeStreamCursor
que pode ser usada para cruzar os resultados.
Você pode chamar os seguintes métodos em uma instância do MongoChangeStreamCursor
:
hasNext()
: Verifica se há mais resultadosnext()
: retorna o próximo documento na coleçãotryNext()
: Retorna imediatamente o próximo elemento disponível no fluxo de alterações ounull
Importante
A iteração do cursor bloqueia o thread atual
A iteração por meio de um cursor usando forEach()
ou qualquer método iterator()
bloqueia o thread atual enquanto o change stream correspondente escuta eventos. Se o programa precisar continuar executando outra lógica, como processar solicitações ou responder à entrada do usuário, considere criar e ouvir seu change stream em um thread separado.
Ao contrário do MongoCursor
gerado por outras queries, um MongoChangeStreamCursor
associado a um change stream aguarda a chegada de um evento de alteração antes de gerar um resultado de next()
. Como resultado, as chamadas para next()
usando o MongoChangeStreamCursor
de um change stream nunca lançam um java.util.NoSuchElementException
.
Para configurar as opções de processamento dos documentos gerados do change stream, use os métodos de nó do objeto ChangeStreamIterable
gerado por watch()
. Acesse o link para a documentação da API do ChangeStreamIterable
na parte inferior deste exemplo para obter mais informações sobre os métodos disponíveis.
Exemplo
Este exemplo mostra como abrir um change stream na collection myColl
e imprimir eventos do change stream conforme eles ocorrem.
O driver armazena evento de change stream em uma variável do tipo ChangeStreamIterable
. No exemplo a seguir, especificamos que o driver deve preencher o objeto ChangeStreamIterable
com tipos Document
. Como resultado, o driver armazena evento individuais de change stream como objeto ChangeStreamDocument
.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
Uma operação de inserção na collection produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Exemplo de exibição: arquivos completos
Observação
Exemplo de configuração
Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB, consulte o guia Criar um MongoClient. Este exemplo também utiliza a coleção do movies
no banco de dados do sample_mflix
incluído nos conjuntos de dados de amostra do Atlas. Você pode carregá-los em seu banco de dados na camada grátis do MongoDB Atlas seguindo o Guia de Introdução ao Atlas.
Este exemplo demonstra como abrir um fluxo de alteração usando o método watch. O arquivo Watch.java
chama o método watch()
com um pipeline como argumento para filtrar somente os eventos "insert"
e "update"
. O arquivo WatchCompanion.java
insere, atualiza e exclui um documento.
Para usar os exemplos a seguir, execute os arquivos nesta ordem:
Execute o arquivo
Watch.java
.Execute o arquivo
WatchCompanion.java
.
Observação
O arquivo Watch.java
continuará em execução até que o arquivo WatchCompanion.java
seja executado.
Watch.java
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java
:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
Saída de exemplo de arquivo completo
Os aplicativos anteriores gerarão a seguinte saída:
Watch.java
capturará somente as operações insert
e update
, pois o agregação pipeline filtra a operação delete
:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion
imprimirá um resumo das operações concluídas:
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
Para saber mais sobre o método watch()
, consulte a seguinte documentação da API:
Aplique operadores de agregação ao seu change stream
Você pode passar um pipeline de agregação como um parâmetro para o método watch()
para especificar quais eventos de alteração o fluxo de alteração recebe.
Para saber quais operadores de aggregation sua versão do MongoDB Server suporta, consulte Modificar a saída do change stream.
Exemplo
O exemplo de código a seguir mostra como você pode aplicar um pipeline de agregação para configurar seu change stream para receber evento de alteração apenas para operações de inserção e atualização:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
Uma operação de atualização na collection produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Dividir eventos de fluxo de grandes mudanças
A partir do MongoDB 7.0, você pode usar o estágio de agregação $changeStreamSplitLargeEvent
para fazer a divisão de evento que excedem 16 MB em fragmentos menores.
Utilize o $changeStreamSplitLargeEvent
somente quando estritamente necessário. For example, use $changeStreamSplitLargeEvent
if your application requires full document pre- or post-images, and generates events that exceed 16 MB.
O estágio $changeStreamSplitLargeEvent retorna os fragmentos sequencialmente. Você pode acessar os fragmentos usando um cursor de change stream. Cada fragmento inclui um objeto SplitEvent
contendo os seguintes campos:
Campo | Descrição |
---|---|
| O índice do fragmento, começando em |
| O número total de fragmentos que compõem o evento de divisão |
O exemplo a seguir modifica seu change stream usando o estágio de aggregation $changeStreamSplitLargeEvent
para fazer a divisão de evento grandes:
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
Observação
Você pode ter apenas um estágio $changeStreamSplitLargeEvent
no seu pipeline de agregação, e ele deve ser o último estágio no pipeline.
Você pode chamar o método getSplitEvent()
no cursor do change stream para acessar o SplitEvent
, conforme mostrado no exemplo a seguir:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
Para obter mais informações sobre o estágio de agregação $changeStreamSplitLargeEvent
, consulte a documentação do servidor $changeStreamSplitLargeEvent .
Incluir pré-imagens e pós-imagens
Você pode configurar o evento de alteração para conter ou omitir os seguintes dados:
A pré-imagem, um documento que representa a versão do documento antes da operação, se existir
A pós-imagem, um documento que representa a versão do documento após a operação, se existir
Importante
Você pode habilitar pré e pós-imagens em coleções somente se seu sistema usar o MongoDB v6.0 ou posterior.
Para receber evento de change stream que incluem uma pré-imagem ou pós-imagem, você deve executar a seguinte ação:
Habilite pré-imagens e pós-imagens para a coleção em seu sistema do MongoDB.
Dica
Para saber como ativar as pré e pós-imagens em sua implantação, consulte Fluxo de alterações com pré e pós-imagens de documentos no manual do servidor.
Para saber como instruir o driver a criar uma collection com pré-imagens e pós-imagens habilitadas, consulte a seção Criar uma collection com pré-imagem e pós-imagens habilitadas .
Configure seu change stream para recuperar as pré-imagens e as pós-imagens ou as duas imagens.
Dica
Para configurar seu change stream para registrar a pré-imagem em eventos de alteração, consulte o Exemplo de configuração de pré-imagem.
Para configurar seu change stream para registrar a pós-imagem em eventos de alteração, consulte o Exemplo de configuração pós-imagem.
Crie uma coleção com pré-imagem e pós-imagens habilitadas
Para usar o driver para criar uma collection com as opções de pré-imagem e pós-imagem habilitadas, especifique uma instância de ChangeStreamPreAndPostImagesOptions
e chame o método createCollection()
, conforme mostrado no exemplo a seguir:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
Você pode alterar a opção pré-imagem e pós-imagem em uma coleção existente executando o comando collMod
a partir do MongoDB Shell. Para saber como executar esta operação, consulte a entrada sobre collMod no manual do servidor MongoDB.
Aviso
Se você ativou pré-imagens ou pós-imagens em uma collection, modificar essas configurações com collMod
pode fazer com que os change stream existentes nessa collection falhem.
Exemplo de configuração de pré-imagem
O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl
para incluir a pré-imagem e gerar quaisquer eventos de alteração:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
O exemplo anterior configura o change stream para utilizar a opção FullDocumentBeforeChange.REQUIRED
. Essa opção configura o change stream para exigir pré-imagens para substituir, atualizar e excluir eventos de alteração. Se a pré-imagem não estiver disponível, o driver chamará um erro.
Suponha que você atualize o valor do campo amount
em um documento de 150
para 2000
. Esse evento de alteração produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
Para obter uma lista de opções, consulte o FullDocumentBeforeChange Documentação da API.
Exemplo de configuração pós-imagem
O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl
para incluir a pré-imagem e gerar quaisquer eventos de alteração:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
O exemplo anterior configura o change stream para utilizar a opção FullDocument.WHEN_AVAILABLE
. Essa opção configura o change stream para retornar a pós-imagem do documento para substituir e atualizar eventos de alteração, se estiver disponível.
Suponha que você atualize o valor do campo color
em um documento de "purple"
para "pink"
. O evento de alteração produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
Para obter uma lista de opções, consulte o FullDocument Documentação da API.
Informações adicionais
Documentação da API
Para obter mais informações sobre os métodos e classes usados para gerenciar change streams, consulte a seguinte documentação da API: