Docs 菜单
Docs 主页
/ / /
Kotlin 协程
/

注意更改

您可以通过打开变更流来跟踪 MongoDB 中的数据更改,例如对集群、数据库或部署的更改。变更流允许应用程序监视数据更改并作出响应。

发生更改时,变更流会返回变更事件文档。变更事件包含有关更新后数据的信息。

通过对 MongoCollectionMongoDatabaseMongoClient 对象调用 watch() 方法来打开变更流,如以下代码示例所示:

val changeStream = collection.watch()

watch() 方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数,以筛选和转换变更事件输出,如下所示:

val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15)))
val changeStream = collection.watch(pipeline)

watch()方法返回一个ChangeStreamFlow类的实例,该类提供多种访问、组织和遍历结果的方法。 ChangeStreamFlow还继承 Kotlin 协程库父类Flow的方法。

您可以在ChangeStreamFlow上调用collect() ,以在事件发生时对其进行处理。 或者,您可以使用Flow内置的其他方法来处理结果。

要配置用于处理从变更流返回的文档的选项,请使用 watch() 返回的 ChangeStreamFlow 对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamFlow API 文档链接。

要从change stream捕获事件,请调用collect()方法,如下所示:

val changeStream = collection.watch()
changeStream.collect {
println("Change observed: $it")
}

.collect()函数会在发出变更事件时Atlas Triggers。您可以在函数中指定逻辑,以便在收到事件文档时对其进行处理。

注意

对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP调用ChangeStreamFlow对象的fullDocument()成员方法,如下所示:

val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)

以下示例应用程序在sample_mflix数据库中的movies集合上打开一个变更流。 应用程序使用聚合管道来筛选基于operationType的更改,以便仅接收插入和更新事件。 删除被遗漏排除。 应用程序使用.collect()方法接收和打印集合上发生的筛选更改事件。

应用程序在单独的协程作业中启动collect()操作,这允许应用程序在change stream打开时继续运行。操作完成后,应用程序将关闭change stream并退出。

注意

此示例使用连接 URI 连接到MongoDB实例。 要学习;了解有关连接到MongoDB实例的更多信息,请参阅连接指南。

import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
data class Movie(val title: String, val year: Int)
fun main() = runBlocking {
// Replace the uri string with your MongoDB deployment's connection string
val uri = "<connection string uri>"
val mongoClient = MongoClient.create(uri)
val database = mongoClient.getDatabase("sample_mflix")
val collection = database.getCollection<Movie>("movies")
val job = launch {
val pipeline = listOf(
Aggregates.match(
Filters.`in`("operationType", mutableListOf("insert", "update"))
)
)
val changeStreamFlow = collection.watch(pipeline)
.fullDocument(FullDocument.DEFAULT)
changeStreamFlow.collect { event ->
println("Received a change to the collection: $event")
}
}
// Insert events captured by the change stream watcher
collection.insertOne(Movie("Back to the Future", 1985))
collection.insertOne(Movie("Freaky Friday", 2003))
// Update event captured by the change stream watcher
collection.updateOne(
Filters.eq(Movie::title.name, "Back to the Future"),
Updates.set(Movie::year.name, 1986)
)
// Delete event not captured by the change stream watcher
collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))
sleep(1000) // Give time for the change stream watcher to process all events
// Cancel coroutine job to stop the change stream watcher
job.cancel()
mongoClient.close()
}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}}
Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}

有关此页面上所提及的类和方法的更多信息,请参阅以下资源:

  • 变更流服务器手动条目

  • 变更事件服务器手册条目

  • 聚合管道服务器手册条目

  • 聚合阶段服务器手册条目

  • ChangeStreamFlow API文档

  • MongoCollection.watch() API 文档

  • MongoDatabase.watch() API 文档

  • MongoClient.watch() API 文档

后退

批量操作