Menu Docs
Página inicial do Docs
/ / /
Kotlin Coroutine
/

Fique atento às mudanças

Você pode acompanhar as alterações nos dados do MongoDB, como alterações em uma coleção, banco de dados ou sistema, abrindo um change stream. Um change stream permite que os aplicativos observem as alterações nos dados e reajam a elas.

O change stream gera documentos de evento de alteração quando ocorrem alterações. Um evento de alteração contém informações sobre os dados atualizados.

Abra um change stream chamando o método watch() em um objeto MongoCollection, MongoDatabase ou MongoClient como demonstrado no seguinte exemplo de código:

val changeStream = collection.watch()

Opcionalmente, o método watch() usa uma aggregation pipeline pipeline que consiste em uma array de estágios como o primeiro parâmetro para filtrar e transformar a saída do evento de alteração da seguinte forma:

val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15)))
val changeStream = collection.watch(pipeline)

O método watch() retorna uma instância do ChangeStreamFlow, uma classe que oferece vários métodos para acessar, organizar e percorrer os resultados. ChangeStreamFlow também herda métodos de sua classe principal Flow da biblioteca Kotlin corrotina.

Você pode chamar collect() no ChangeStreamFlow para lidar com eventos à medida que eles ocorrem. Alternativamente, você pode usar outros métodos embutidos no Flow para trabalhar com os resultados.

Para configurar as opções de processamento dos documentos gerados do change stream, use os métodos de nó do objeto ChangeStreamFlow gerado por watch(). Acesse o link para a documentação da API do ChangeStreamFlow na parte inferior deste exemplo para obter mais informações sobre os métodos disponíveis.

Para capturar eventos de um change stream, chame o método collect() como mostrado abaixo:

val changeStream = collection.watch()
changeStream.collect {
println("Change observed: $it")
}

A função .collect() Atlas Triggers se um evento de alteração é emitido. Você pode especificar a lógica na função para processar o documento do evento quando ele for recebido.

Observação

Para eventos de alteração de operação de atualização, os fluxos de alteração retornam apenas os campos modificados por padrão, em vez de todo o documento atualizado. Você pode configurar seu change stream para retornar também a versão mais atual do documento, chamando o método-membro fullDocument() do objeto ChangeStreamFlow com o valor FullDocument.UPDATE_LOOKUP da seguinte forma:

val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)

A aplicação de exemplo a seguir abre um change stream na collection movies no reconhecimento de data center sample_mflix . O aplicativo usa um aggregation pipeline para filtrar as alterações com base em operationType para que ele receba apenas eventos de inserção e atualização. As exclusões são excluídas por omissão. A aplicação utiliza o método .collect() para receber e imprimir os evento de alteração filtrados que ocorrem na collection.

A aplicação inicia a operação collect() em uma tarefa corrotina separada, o que permite que a aplicação continue em execução enquanto o change stream estiver aberto. Depois que as operações forem concluídas, o aplicativo fechará o change stream e sairá.

Observação

Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB , consulte oguia de conexão .

import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
data class Movie(val title: String, val year: Int)
fun main() = runBlocking {
// Replace the uri string with your MongoDB deployment's connection string
val uri = "<connection string uri>"
val mongoClient = MongoClient.create(uri)
val database = mongoClient.getDatabase("sample_mflix")
val collection = database.getCollection<Movie>("movies")
val job = launch {
val pipeline = listOf(
Aggregates.match(
Filters.`in`("operationType", mutableListOf("insert", "update"))
)
)
val changeStreamFlow = collection.watch(pipeline)
.fullDocument(FullDocument.DEFAULT)
changeStreamFlow.collect { event ->
println("Received a change to the collection: $event")
}
}
// Insert events captured by the change stream watcher
collection.insertOne(Movie("Back to the Future", 1985))
collection.insertOne(Movie("Freaky Friday", 2003))
// Update event captured by the change stream watcher
collection.updateOne(
Filters.eq(Movie::title.name, "Back to the Future"),
Updates.set(Movie::year.name, 1986)
)
// Delete event not captured by the change stream watcher
collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))
sleep(1000) // Give time for the change stream watcher to process all events
// Cancel coroutine job to stop the change stream watcher
job.cancel()
mongoClient.close()
}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}}
Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}

Para obter informações adicionais sobre as classes e métodos mencionados nesta página, consulte os seguintes recursos:

  • Change Streams Entrada manual do servidor

  • Eventos de alteração Entrada manual do servidor

  • Pipeline de agregação Entrada manual do servidor

  • Estágios de agregação Entrada manual do servidor

  • ChangeStreamFlow Documentação da API

  • MongoCollection.watch() Documentação da API

  • MongoDatabase.watch() Documentação da API

  • MongoClient.watch() Documentação da API

Voltar

Realizar operações em massa