监控数据变化
Overview
通过本指南,您可以学习如何使用变更流来监视文档更改。
变更流输出新的变更事件,提供对实时数据变更的访问权限。您可以在集合、数据库或客户端对象上打开变更流。
样本数据
本部分的示例使用以下 Course
结构作为 courses
集合中文档的模型:
type Course struct { Title string Enrollment int32 }
要运行本指南中的示例,请通过使用以下代码段将这些文档加载到 db
数据库中的 courses
集合中:
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
提示
不存在的数据库和集合
如果执行写操作时不存在必要的数据库和集合,服务器会隐式创建这些数据库和集合。
每个文档均包含某一大学课程的说明,其中包括课程标题和最大注册人数,而它们分别对应于每个文档中的 title
和enrollment
字段。
注意
每个示例输出都显示截断的 _data
、clusterTime
和 ObjectID
值,因为驱动程序会唯一生成这些值。
打开变更流
要打开变更流,请使用 Watch()
方法。Watch()
方法需要一个上下文参数和一个管道参数。要返回所有更改,请输入一个空的 Pipeline
对象。
例子
以下示例在 courses
集合上打开变更流并输出所有更改:
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
如果您在单独的程序或 Shell 中修改 courses
集合,则此代码在发生更改时打印输出更改。插入 title
值为 "Advanced Screenwriting"
且 enrollment
值为 20
的文档会导致以下更改事件:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
修改变更流输出
使用管道参数修改变更流输出。此参数允许您仅监视某些变更事件。将管道参数的格式设置为文档数组,每个文档表示一个聚合阶段。
您可以在此参数中使用以下管道阶段:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
例子
下例在 db
数据库上打开变更流,但仅监视新的删除操作:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
在 db
数据库上调用了 Watch()
方法,因此代码会输出对此数据库内任何集合的新删除操作。
修改以下方法的行为: Watch()
使用 options
参数修改 Watch()
方法的行为。
您可以为 Watch()
方法指定以下选项:
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
StartAtOperationTime
Custom
CustomPipeline
有关这些选项的更多信息,请访问 MongoDB Server手册。
前像和后像
当您对集合执行任何 CRUD 操作时,默认情况下,相应的变更事件文档仅包含该操作修改的字段的增量。通过在 Watch()
方法的 options
参数中指定设置,您可以查看更改前后的完整文档以及增量。
如果要查看文档的后像,即文档更改后的完整版本,请将 options
参数的 FullDocument
字段设置为以下值之一:
UpdateLookup
:变更事件文档包括整个已变更文档的副本。WhenAvailable
:变更事件文档包括变更事件的已修改文档的后像(如果后像可用)。Required
:输出与WhenAvailable
相同,但如果后像不可用,驱动程序会引发服务器端错误。
如果要查看文档的前像,即文档更改前的完整版本,请将 options
参数的 FullDocumentBeforeChange
字段设置为以下值之一:
WhenAvailable
:如果前像可用,则变更事件文档包括变更事件的已修改文档的前像。Required
:输出与WhenAvailable
的输出相同,但如果前像不可用,驱动程序会引发服务器端错误。
重要
要访问文档前像和后像,您必须为集合启用 changeStreamPreAndPostImages
。请参阅 MongoDB Server 手册,获取说明和更多信息。
注意
插入的文档没有前像,已删除的文档没有后像。
例子
以下示例对 courses
集合调用 Watch()
方法。它为 options
参数的 FullDocument
字段指定一个值,以输出整个已修改文档的副本,而不是仅输出已更改字段:
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
使用 "World Fiction"
的 title
将文档的 enrollment
值从 35
更新为 30
会导致以下变更事件:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
如果不指定 FullDocument
选项,同一更新操作将不再在变更事件文档中输出 "fullDocument"
值。
更多信息
有关变更流的可运行示例,请参阅监控数据更改。
有关 变更流 的更多信息,请参阅Change Streams 。
API 文档
要了解有关 Watch()
方法的更多信息,请访问以下 API 文档链接: