Fluxos de change streams
Nesta página
Visão geral
Neste guia, você pode aprender como usar um change stream para monitorar alterações em tempo real em seu reconhecimento de data center. Um change stream é um recurso do MongoDB Server que permite que sua aplicação assine alterações de dados em uma única collection, reconhecimento de data center ou sistema.
Você pode especificar um conjunto de operadores de agregação para filtrar e transformar os dados que seu aplicativo recebe. Ao se conectar a um sistema MongoDB v6.0 ou posterior, você também pode configurar os eventos para incluir os dados do documento antes e depois da alteração.
Saiba como abrir e configurar seu change stream nas seguintes seções:
Abrir um fluxo de alterações
Você pode abrir um fluxo de alterações para assinar tipos específicos de alterações de dados e produzir eventos de alteração em seu aplicativo.
Para abrir um change stream, chame o método watch()
em uma instância de um MongoCollection
, MongoDatabase
ou MongoClient
.
Importante
Sistemas standalone do MongoDB não oferecem suporte a change streams porque o recurso exige um oplog de conjunto de réplicas. Para saber mais sobre o oplog, consulte a página de manual do Replica Set oplog MongoDB Server .
O objeto no qual você chama o método watch()
determina o escopo de eventos que o change stream escuta.
Se você chamar watch()
em um MongoCollection
, o change stream monitorará uma collection.
Se você chamar watch()
em um MongoDatabase
, o change stream monitorará todas as collection nesse reconhecimento de data center.
Se você chamar watch()
em um MongoClient
, o change stream monitorará todas as alterações na implantação do MongoDB conectada.
Exemplo
Este exemplo mostra como abrir um change stream na collection myColl
e imprimir eventos do change stream conforme eles ocorrem.
O driver armazena evento de change stream em uma variável do tipo ChangeStreamIterable
. No exemplo a seguir, especificamos que o driver deve preencher o objeto ChangeStreamIterable
com tipos Document
. Como resultado, o driver armazena evento individuais de change stream como objeto ChangeStreamDocument
.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
Uma operação de inserção na collection produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Para obter um exemplo executável, consulte a página de exemplo de uso Fique atento às mudanças .
Para saber mais sobre o método watch()
, consulte a seguinte documentação da API:
Aplique operadores de agregação ao seu change stream
Você pode passar um pipeline de agregação como um parâmetro para o método watch()
para especificar quais eventos de alteração o fluxo de alteração recebe.
Para saber quais operadores de aggregation sua versão do MongoDB Server suporta, consulte Modificar a saída do change stream.
Exemplo
O exemplo de código a seguir mostra como você pode aplicar um pipeline de agregação para configurar seu change stream para receber evento de alteração apenas para operações de inserção e atualização:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
Uma operação de atualização na collection produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Dividir eventos de fluxo de grandes mudanças
A partir do MongoDB 7.0, você pode usar o estágio de agregação $changeStreamSplitLargeEvent
para fazer a divisão de evento que excedem 16 MB em fragmentos menores.
Utilize o $changeStreamSplitLargeEvent
somente quando estritamente necessário. For example, use $changeStreamSplitLargeEvent
if your application requires full document pre- or post-images, and generates events that exceed 16 MB.
O estágio $changeStreamSplitLargeEvent retorna os fragmentos sequencialmente. Você pode acessar os fragmentos usando um cursor de change stream. Cada fragmento inclui um objeto SplitEvent
contendo os seguintes campos:
Campo | Descrição |
---|---|
fragment | O índice do fragmento, começando em 1 |
of | O número total de fragmentos que compõem o evento de divisão |
O exemplo a seguir modifica seu change stream usando o estágio de aggregation $changeStreamSplitLargeEvent
para fazer a divisão de evento grandes:
ChangeStreamIterable<Document> changeStream = collection.watch( Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
Observação
Você pode ter apenas um estágio $changeStreamSplitLargeEvent
no seu pipeline de agregação, e ele deve ser o último estágio no pipeline.
Você pode chamar o método getSplitEvent()
no cursor do change stream para acessar o SplitEvent
, conforme mostrado no exemplo a seguir:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
Para obter mais informações sobre o estágio de agregação $changeStreamSplitLargeEvent
, consulte a documentação do servidor $changeStreamSplitLargeEvent .
Incluir pré-imagens e pós-imagens
Você pode configurar o evento de alteração para conter ou omitir os seguintes dados:
A pré-imagem, um documento que representa a versão do documento antes da operação, se existir
A pós-imagem, um documento que representa a versão do documento após a operação, se existir
Importante
Você pode habilitar pré e pós-imagens em coleções somente se seu sistema usar o MongoDB v6.0 ou posterior.
Para receber evento de change stream que incluem uma pré-imagem ou pós-imagem, você deve executar a seguinte ação:
Habilite pré-imagens e pós-imagens para a coleção em seu sistema do MongoDB.
Dica
Para saber como ativar as pré e pós-imagens em sua implantação, consulte Fluxo de alterações com pré e pós-imagens de documentos no manual do servidor.
Para saber como instruir o driver a criar uma collection com pré-imagens e pós-imagens habilitadas, consulte a seção Criar uma collection com pré-imagem e pós-imagens habilitadas .
Configure seu change stream para recuperar as pré-imagens e as pós-imagens ou as duas imagens.
Dica
Para configurar seu change stream para registrar a pré-imagem em eventos de alteração, consulte o Exemplo de configuração de pré-imagem.
Para configurar seu change stream para registrar a pós-imagem em eventos de alteração, consulte o Exemplo de configuração pós-imagem.
Crie uma coleção com pré-imagem e pós-imagens habilitadas
Para usar o driver para criar uma collection com as opções de pré-imagem e pós-imagem habilitadas, especifique uma instância de ChangeStreamPreAndPostImagesOptions
e chame o método createCollection()
, conforme mostrado no exemplo a seguir:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
Você pode alterar a opção pré-imagem e pós-imagem em uma coleção existente executando o comando collMod
a partir do MongoDB Shell. Para saber como executar esta operação, consulte a entrada sobre collMod no manual do servidor MongoDB.
Aviso
Se você ativou pré-imagens ou pós-imagens em uma collection, modificar essas configurações com collMod
pode fazer com que os change stream existentes nessa collection falhem.
Exemplo de configuração de pré-imagem
O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl
para incluir a pré-imagem e gerar quaisquer eventos de alteração:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
O exemplo anterior configura o change stream para utilizar a opção FullDocumentBeforeChange.REQUIRED
. Essa opção configura o change stream para exigir pré-imagens para substituir, atualizar e excluir eventos de alteração. Se a pré-imagem não estiver disponível, o driver chamará um erro.
Suponha que você atualize o valor do campo amount
em um documento de 150
para 2000
. Esse evento de alteração produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
Para obter uma lista de opções, consulte o FullDocumentBeforeChange Documentação da API.
Exemplo de configuração pós-imagem
O exemplo de código a seguir mostra como você pode configurar um change stream na collection myColl
para incluir a pré-imagem e gerar quaisquer eventos de alteração:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
O exemplo anterior configura o change stream para utilizar a opção FullDocument.WHEN_AVAILABLE
. Essa opção configura o change stream para retornar a pós-imagem do documento para substituir e atualizar eventos de alteração, se estiver disponível.
Suponha que você atualize o valor do campo color
em um documento de "purple"
para "pink"
. O evento de alteração produz a seguinte saída:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
Para obter uma lista de opções, consulte o FullDocument Documentação da API.