Menu Docs
Página inicial do Docs
/ / /
Driver de sincronização Java
/ / /

Fluxos de change streams

Nesta página

  • Visão geral
  • Abrir um fluxo de alterações
  • Aplique operadores de agregação ao seu change stream
  • Dividir eventos de fluxo de grandes mudanças
  • Incluir pré-imagens e pós-imagens

Neste guia, você pode aprender como usar um change stream para monitorar alterações em tempo real em seu reconhecimento de data center. Um change stream é um recurso do MongoDB Server que permite que sua aplicação assine alterações de dados em uma única collection, reconhecimento de data center ou sistema.

Você pode especificar um conjunto de operadores de agregação para filtrar e transformar os dados que seu aplicativo recebe. Ao se conectar a um sistema MongoDB v6.0 ou posterior, você também pode configurar os eventos para incluir os dados do documento antes e depois da alteração.

Saiba como abrir e configurar seu change stream nas seguintes seções:

  • Abrir um fluxo de alterações

  • Aplique operadores de agregação ao seu change stream

  • Incluir pré-imagens e pós-imagens

Você pode abrir um fluxo de alterações para assinar tipos específicos de alterações de dados e produzir eventos de alteração em seu aplicativo.

Para abrir um change stream, chame o método watch() em uma instância de um MongoCollection, MongoDatabase ou MongoClient.

Importante

Sistemas standalone do MongoDB não oferecem suporte a change streams porque o recurso exige um oplog de conjunto de réplicas. Para saber mais sobre o oplog, consulte a página de manual do Replica Set oplog MongoDB Server .

O objeto no qual você chama o método watch() determina o escopo de eventos que o change stream escuta.

Se você chamar watch() em um MongoCollection, o change stream monitorará uma collection.

Se você chamar watch() em um MongoDatabase, o change stream monitorará todas as collection nesse reconhecimento de data center.

Se você chamar watch() em um MongoClient , o change stream monitorará todas as alterações na implantação do MongoDB conectada.

Este exemplo mostra como abrir um change stream na collection myColl e imprimir eventos do change stream conforme eles ocorrem.

O driver armazena evento de change stream em uma variável do tipo ChangeStreamIterable. No exemplo a seguir, especificamos que o driver deve preencher o objeto ChangeStreamIterable com tipos Document . Como resultado, o driver armazena evento individuais de change stream como objeto ChangeStreamDocument .

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

Uma operação de inserção na collection produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

Para obter um exemplo executável, consulte a página de exemplo de uso Fique atento às mudanças .

Para saber mais sobre o método watch() , consulte a seguinte documentação da API:

Você pode passar um pipeline de agregação como um parâmetro para o método watch() para especificar quais eventos de alteração o fluxo de alteração recebe.

Para saber quais operadores de aggregation sua versão do MongoDB Server suporta, consulte Modificar a saída do change stream.

O exemplo de código a seguir mostra como você pode aplicar um pipeline de agregação para configurar seu change stream para receber evento de alteração apenas para operações de inserção e atualização:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

Uma operação de atualização na collection produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

A partir do MongoDB 7.0, você pode usar o estágio de agregação $changeStreamSplitLargeEvent para fazer a divisão de evento que excedem 16 MB em fragmentos menores.

Utilize o $changeStreamSplitLargeEvent somente quando estritamente necessário. For example, use $changeStreamSplitLargeEvent if your application requires full document pre- or post-images, and generates events that exceed 16 MB.

O estágio $changeStreamSplitLargeEvent retorna os fragmentos sequencialmente. Você pode acessar os fragmentos usando um cursor de change stream. Cada fragmento inclui um objeto SplitEvent contendo os seguintes campos:

Campo
Descrição
fragment
O índice do fragmento, começando em 1
of
O número total de fragmentos que compõem o evento de divisão

O exemplo a seguir modifica seu change stream usando o estágio de aggregation $changeStreamSplitLargeEvent para fazer a divisão de evento grandes:

ChangeStreamIterable<Document> changeStream = collection.watch(
Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

Observação

Você pode ter apenas um estágio $changeStreamSplitLargeEvent no seu pipeline de agregação, e ele deve ser o último estágio no pipeline.

Você pode chamar o método getSplitEvent() no cursor do change stream para acessar o SplitEvent , conforme mostrado no exemplo a seguir:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

Para obter mais informações sobre o estágio de agregação $changeStreamSplitLargeEvent , consulte a documentação do servidor $changeStreamSplitLargeEvent .

Você pode configurar o evento de alteração para conter ou omitir os seguintes dados:

  • A pré-imagem, um documento que representa a versão do documento antes da operação, se existir

  • A pós-imagem, um documento que representa a versão do documento após a operação, se existir

Importante

Você pode habilitar pré e pós-imagens em coleções somente se seu sistema usar o MongoDB v6.0 ou posterior.

Para receber evento de change stream que incluem uma pré-imagem ou pós-imagem, você deve executar a seguinte ação:

  • Habilite pré-imagens e pós-imagens para a coleção em seu sistema do MongoDB.

    Dica

    Para saber como ativar as pré e pós-imagens em sua implantação, consulte Fluxo de alterações com pré e pós-imagens de documentos no manual do servidor.

    Para saber como instruir o driver a criar uma collection com pré-imagens e pós-imagens habilitadas, consulte a seção Criar uma collection com pré-imagem e pós-imagens habilitadas .

  • Configure seu change stream para recuperar as pré-imagens e as pós-imagens ou as duas imagens.

    Dica

    Para configurar seu change stream para registrar a pré-imagem em eventos de alteração, consulte o Exemplo de configuração de pré-imagem.

    Para configurar seu change stream para registrar a pós-imagem em eventos de alteração, consulte o Exemplo de configuração pós-imagem.

Para usar o driver para criar uma collection com as opções de pré-imagem e pós-imagem habilitadas, especifique uma instância de ChangeStreamPreAndPostImagesOptions e chame o método createCollection() , conforme mostrado no exemplo a seguir:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

Você pode alterar a opção pré-imagem e pós-imagem em uma coleção existente executando o comando collMod a partir do MongoDB Shell. Para saber como executar esta operação, consulte a entrada sobre collMod no manual do servidor MongoDB.

Aviso

Se você ativou pré-imagens ou pós-imagens em uma collection, modificar essas configurações com collMod pode fazer com que os change stream existentes nessa collection falhem.

O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl para incluir a pré-imagem e gerar quaisquer eventos de alteração:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

O exemplo anterior configura o change stream para utilizar a opção FullDocumentBeforeChange.REQUIRED . Essa opção configura o change stream para exigir pré-imagens para substituir, atualizar e excluir eventos de alteração. Se a pré-imagem não estiver disponível, o driver chamará um erro.

Suponha que você atualize o valor do campo amount em um documento de 150 para 2000. Esse evento de alteração produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

Para obter uma lista de opções, consulte o FullDocumentBeforeChange Documentação da API.

O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl para incluir a pré-imagem e gerar quaisquer eventos de alteração:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

O exemplo anterior configura o change stream para utilizar a opção FullDocument.WHEN_AVAILABLE . Essa opção configura o change stream para retornar a pós-imagem do documento para substituir e atualizar eventos de alteração, se estiver disponível.

Suponha que você atualize o valor do campo color em um documento de "purple" para "pink". O evento de alteração produz a seguinte saída:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

Para obter uma lista de opções, consulte o FullDocument Documentação da API.

← Acessar dados de um cursor