Fique atento às mudanças
Você pode acompanhar as alterações nos dados do MongoDB, como alterações em uma coleção, banco de dados ou sistema, abrindo um change stream. Um change stream permite que os aplicativos observem as alterações nos dados e reajam a elas.
O change stream gera documentos de evento de alteração quando ocorrem alterações. Um evento de alteração contém informações sobre os dados atualizados.
Abra um change stream chamando o método watch()
em um objeto MongoCollection
, MongoDatabase
ou MongoClient
como demonstrado no seguinte exemplo de código:
val changeStream = collection.watch()
Opcionalmente, o método watch()
usa uma aggregation pipeline pipeline que consiste em uma array de estágios como o primeiro parâmetro para filtrar e transformar a saída do evento de alteração da seguinte forma:
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
O método watch()
retorna uma instância do ChangeStreamFlow
, uma classe que oferece vários métodos para acessar, organizar e percorrer os resultados. ChangeStreamFlow
também herda métodos de sua classe principal Flow
da biblioteca Kotlin corrotina.
Você pode chamar collect()
no ChangeStreamFlow
para lidar com eventos à medida que eles ocorrem. Alternativamente, você pode usar outros métodos embutidos no Flow
para trabalhar com os resultados.
Para configurar as opções de processamento dos documentos gerados do change stream, use os métodos de nó do objeto ChangeStreamFlow
gerado por watch()
. Acesse o link para a documentação da API do ChangeStreamFlow
na parte inferior deste exemplo para obter mais informações sobre os métodos disponíveis.
Processar evento de change stream com .collect()
Para capturar eventos de um change stream, chame o método collect()
como mostrado abaixo:
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
A função .collect()
Atlas Triggers se um evento de alteração é emitido. Você pode especificar a lógica na função para processar o documento do evento quando ele for recebido.
Observação
Para eventos de alteração de operação de atualização, os fluxos de alteração retornam apenas os campos modificados por padrão, em vez de todo o documento atualizado. Você pode configurar seu change stream para retornar também a versão mais atual do documento, chamando o método-membro fullDocument()
do objeto ChangeStreamFlow
com o valor FullDocument.UPDATE_LOOKUP
da seguinte forma:
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
Exemplo
A aplicação de exemplo a seguir abre um change stream na collection movies
no reconhecimento de data center sample_mflix
. O aplicativo usa um aggregation pipeline para filtrar as alterações com base em operationType
para que ele receba apenas eventos de inserção e atualização. As exclusões são excluídas por omissão. A aplicação utiliza o método .collect()
para receber e imprimir os evento de alteração filtrados que ocorrem na collection.
A aplicação inicia a operação collect()
em uma tarefa corrotina separada, o que permite que a aplicação continue em execução enquanto o change stream estiver aberto. Depois que as operações forem concluídas, o aplicativo fechará o change stream e sairá.
Observação
Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB , consulte oguia de conexão .
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
Para obter informações adicionais sobre as classes e métodos mencionados nesta página, consulte os seguintes recursos:
Change Streams Entrada manual do servidor
Eventos de alteração Entrada manual do servidor
Pipeline de agregação Entrada manual do servidor
Estágios de agregação Entrada manual do servidor
ChangeStreamFlow Documentação da API
MongoCollection.watch() Documentação da API
MongoDatabase.watch() Documentação da API
MongoClient.watch() Documentação da API