Fluxos de change streams
Nesta página
Visão geral
Neste guia, você pode aprender como usar um change stream para monitorar alterações em tempo real em seu reconhecimento de data center. Um change stream é um recurso do MongoDB Server que permite que sua aplicação assine alterações de dados em uma única collection, reconhecimento de data center ou sistema. Você pode especificar um conjunto de operadores de agregação para filtrar e transformar os dados que seu aplicativo recebe. Ao se conectar ao MongoDB v6.0 ou posterior, você pode configurar os eventos para incluir os dados do documento antes e depois da alteração.
Saiba como abrir e configurar seu change stream nas seguintes seções:
Abrir um fluxo de alterações
Você pode abrir um fluxo de alterações para assinar tipos específicos de alterações de dados e produzir eventos de alteração em seu aplicativo.
Para abrir um change stream, chame o método watch()
em uma instância de um MongoCollection
, MongoDatabase
ou MongoClient
.
Importante
Sistemas standalone do MongoDB não oferecem suporte a change streams porque o recurso exige um oplog de conjunto de réplicas. Para saber mais sobre o oplog, consulte a página de manual do servidor de oplogConjunto de réplicas .
O objeto no qual você chama o método watch()
determina o escopo de eventos que o change stream escuta.
Se você chamar watch()
em um MongoCollection
, o change stream monitorará uma collection.
Se você chamar watch()
em um MongoDatabase
, o change stream monitorará todas as collection nesse reconhecimento de data center.
Se você chamar watch()
em um MongoClient
, o change stream monitorará todas as alterações na implantação do MongoDB conectada.
Exemplo
O exemplo de código a seguir mostra como abrir um change stream e imprimir eventos do change stream sempre que os dados na collection forem alterados:
// Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
Uma operação de inserção na collection deve producir uma saída semelhante ao seguinte texto:
Received a change event: ChangeStreamDocument{ operationType='insert', resumeToken={"_data": "825EC..."}, namespace=myDb.myChangeStreamCollection, ... }
Para obter um exemplo executável, consulte a página de exemplo de uso Fique atento às mudanças .
Para saber mais sobre o método watch()
, consulte a seguinte documentação da API:
Aplique operadores de agregação ao seu change stream
Você pode passar um pipeline de agregação como um parâmetro para o método watch()
para especificar quais eventos de alteração o fluxo de alteração recebe.
Para saber quais operadores de aggregation sua versão do servidor MongoDB suporta, consulte Modificar a saída do change stream.
Exemplo
O exemplo de código a seguir mostra como você pode aplicar um pipeline de agregação para configurar seu change stream para receber evento de alteração apenas para operações de inserção e atualização:
val pipeline = listOf( Aggregates.match(Filters.`in`("operationType", listOf("insert", "update"))) ) // Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
Quando o change stream recebe um evento de alteração de atualização, o exemplo de código anterior gera o seguinte texto:
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, ...
Dividir eventos de fluxo de grandes mudanças
Ao conectar ao MongoDB v7.0 ou posterior, você pode usar o operador de agregação $changeStreamSplitLargeEvent
para fazer a divisão de documento de evento que excedam 16 MB em fragmentos menores.
Use o operador $changeStreamSplitLargeEvent
somente quando você espera que os eventos do change stream excedam o limite de tamanho do documento. Por exemplo, você pode usar esse recurso se o seu aplicativo exigir pré-imagens ou pós-imagens completas do documento.
Um estágio de agregação $changeStreamSplitLargeEvent
retorna fragmentos sequencialmente. Você pode acessar os fragmentos usando um cursor de change stream. Cada documento de fragmento inclui um objeto splitEvent
que contém os seguintes campos:
Campo | Descrição |
---|---|
fragment | O índice do fragmento, começando em 1 |
of | O número total de fragmentos que compõem o evento de divisão |
O exemplo a seguir abre um change stream que inclui um pipeline de agregação com um estágio de agregação $changeStreamSplitLargeEvent
para a divisão de evento grandes:
val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument())) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
Observação
Você pode ter apenas um estágio $changeStreamSplitLargeEvent
no seu pipeline de agregação, e ele deve ser o último estágio no pipeline.
Para saber mais sobre o operador de agregação do $changeStreamSplitLargeEvent
, consulte $changeStreamSplitLargeEvent (agregação) no manual do servidor MongoDB.
Incluir pré-imagens e pós-imagens
Você pode configurar o evento de alteração para conter ou omitir os seguintes dados:
A pré-imagem, que é um documento que representa a versão do documento antes da operação, se existir
A pós-imagem, que é um documento que representa a versão do documento após a operação, se ela existir
Para receber eventos de fluxo de alterações que incluem uma pré-imagem ou pós-imagem, você deve se conectar a uma implantação do MongoDB v6.0 ou posterior e configurar o seguinte:
Habilite pré-imagens e pós-imagens para a coleção em seu sistema do MongoDB.
Dica
Para saber como habilitá-los em seu sistema, consulte a página de manual do servidor MongoDB de Change Streams com pré e pós-imagens de documentos.
Para saber como instruir o driver a criar uma collection com pré-imagens e pós-imagens habilitadas, consulte a seção Criar uma collection com pré-imagem e pós-imagens habilitadas .
Configure seu change stream para recuperar as pré-imagens e as pós-imagens ou as duas imagens.
Dica
Para configurar seu change stream para incluir a pré-imagem, consulte o Exemplo de configuração de pré-imagem.
Para configurar seu change stream para incluir a pós-imagem, consulte o Exemplo de configuração pós-imagem.
Crie uma coleção com pré-imagem e pós-imagens habilitadas
Para criar uma collection com a opção pré-imagem e pós-imagem utilizando o driver, especifique uma instância de ChangeStreamPreAndPostImagesOptions
e chame o método createCollection()
como mostrado no seguinte exemplo:
val collectionOptions = CreateCollectionOptions() collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true)) database.createCollection("myChangeStreamCollection", collectionOptions)
Você pode alterar a opção pré-imagem e pós-imagem em uma coleção existente executando o comando collMod
a partir do MongoDB Shell. Para saber como executar esta operação, consulte a documentação manual do servidor collMod .
Aviso
Quando você modifica essa opção em uma collection, qualquer change stream aberto nessa collection em seu aplicativo pode falhar se estiver configurado para exigir o recebimento da pré-imagem ou da pós-imagem.
Exemplo de configuração de pré-imagem
O exemplo de código a seguir mostra como configurar um change stream para incluir a pré-imagem e gerar os resultados:
val job = launch { val changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
O exemplo anterior configura o change stream para utilizar a opção FullDocumentBeforeChange.REQUIRED
. Isso configura o change stream para retornar pré-imagens para substituir, atualizar e excluir evento de alteração e para o servidor gerar um erro se a pré-imagem não estiver disponível.
Suponha que uma aplicação atualizou o campo latestVersion
de um documento em uma collection de dependências da biblioteca de software do valor de 2.0.0
para 2.1.0
. A saída do evento de alteração correspondente pelo exemplo de código anterior deve se assemelhar ao seguinte texto:
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...} namespace=software.libraries, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}}, ...
Para obter uma lista de opções, consulte o FullDocumentBeforeChange Documentação da API.
Exemplo de configuração pós-imagem
O exemplo de código a seguir mostra como configurar um change stream para incluir a pós-imagem e gerar os resultados:
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
O exemplo anterior configura o change stream para utilizar a opção FullDocument.UPDATE_LOOKUP
. Isso configura o change stream para retornar ambos os deltas entre o documento original e o documento alterado e uma cópia do documento em algum ponto após a ocorrência da alteração.
Suponha que uma aplicação tenha atualizado o campo population
de um documento do valor de 800
para 950
em uma collection de dados de cesso de cidade. A saída do evento de alteração correspondente pelo exemplo de código anterior deve se assemelhar ao seguinte texto:
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, namespace=censusData.cities, destinationNamespace=null, fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}}, updatedFields={"population": 950}, ... ...
Para obter uma lista de opções, consulte o FullDocument Documentação da API.