注意更改
您可以通过打开变更流来跟踪 MongoDB 中的数据更改,例如对集群、数据库或部署的更改。变更流允许应用程序监视数据更改并作出响应。
发生更改时,变更流会返回变更事件文档。变更事件包含有关更新后数据的信息。
通过对 MongoCollection
、MongoDatabase
或 MongoClient
对象调用 watch()
方法来打开变更流,如以下代码示例所示:
val changeStream = collection.watch()
watch()
方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数,以筛选和转换变更事件输出,如下所示:
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
watch()
方法返回一个ChangeStreamFlow
类的实例,该类提供多种访问、组织和遍历结果的方法。 ChangeStreamFlow
还继承 Kotlin 协程库父类Flow
的方法。
您可以在ChangeStreamFlow
上调用collect()
,以在事件发生时对其进行处理。 或者,您可以使用Flow
内置的其他方法来处理结果。
要配置用于处理从变更流返回的文档的选项,请使用 watch()
返回的 ChangeStreamFlow
对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamFlow
API 文档链接。
使用 .collect() 处理 change stream 事件
要从change stream捕获事件,请调用collect()
方法,如下所示:
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
.collect()
函数会在发出变更事件时Atlas Triggers。您可以在函数中指定逻辑,以便在收到事件文档时对其进行处理。
注意
对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP
调用ChangeStreamFlow
对象的fullDocument()
成员方法,如下所示:
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
例子
以下示例应用程序在sample_mflix
数据库中的movies
集合上打开一个变更流。 应用程序使用聚合管道来筛选基于operationType
的更改,以便仅接收插入和更新事件。 删除被遗漏排除。 应用程序使用.collect()
方法接收和打印集合上发生的筛选更改事件。
应用程序在单独的协程作业中启动collect()
操作,这允许应用程序在change stream打开时继续运行。操作完成后,应用程序将关闭change stream并退出。
注意
此示例使用连接 URI 连接到MongoDB实例。 要学习;了解有关连接到MongoDB实例的更多信息,请参阅连接指南。
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
有关此页面上所提及的类和方法的更多信息,请参阅以下资源:
变更流服务器手动条目
变更事件服务器手册条目
聚合管道服务器手册条目
聚合阶段服务器手册条目
ChangeStreamFlow API文档
MongoCollection.watch() API 文档
MongoDatabase.watch() API 文档
MongoClient.watch() API 文档