Docs 菜单
Docs 主页
/ / /
Go
/ / /

监控数据变化

在此页面上

  • Overview
  • 样本数据
  • 打开变更流
  • 例子
  • 修改变更流输出
  • 例子
  • 修改以下方法的行为: Watch()
  • 前像和后像
  • 例子
  • 更多信息
  • API 文档

通过本指南,您可以学习如何使用变更流来监视文档更改。

变更流输出新的变更事件,提供对实时数据变更的访问权限。您可以在集合、数据库或客户端对象上打开变更流。

本部分的示例使用以下 Course 结构作为 courses 集合中文档的模型:

type Course struct {
Title string
Enrollment int32
}

要运行本指南中的示例,请通过使用以下代码段将这些文档加载到 db 数据库中的 courses 集合中:

coll := client.Database("db").Collection("courses")
docs := []interface{}{
Course{Title: "World Fiction", Enrollment: 35},
Course{Title: "Abstract Algebra", Enrollment: 60},
}
result, err := coll.InsertMany(context.TODO(), docs)

提示

不存在的数据库和集合

如果执行写操作时不存在必要的数据库和集合,服务器会隐式创建这些数据库和集合。

每个文档均包含某一大学课程的说明,其中包括课程标题和最大注册人数,而它们分别对应于每个文档中的 titleenrollment 字段。

注意

每个示例输出都显示截断的 _dataclusterTimeObjectID 值,因为驱动程序会唯一生成这些值。

要打开变更流,请使用 Watch() 方法。Watch() 方法需要一个上下文参数和一个管道参数。要返回所有更改,请输入一个空的 Pipeline 对象。

以下示例在 courses 集合上打开变更流并输出所有更改:

changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

如果您在单独的程序或 Shell 中修改 courses 集合,则此代码在发生更改时打印输出更改。插入 title 值为 "Advanced Screenwriting"enrollment 值为 20 的文档会导致以下更改事件:

map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")]
fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns:
map[coll:courses db:db] operationType:insert]

使用管道参数修改变更流输出。此参数允许您仅监视某些变更事件。将管道参数的格式设置为文档数组,每个文档表示一个聚合阶段。

您可以在此参数中使用以下管道阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

下例在 db 数据库上打开变更流,但仅监视新的删除操作:

db := client.Database("db")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the delete operation change events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

注意

db 数据库上调用了 Watch() 方法,因此代码会输出对此数据库内任何集合的新删除操作。

使用 options 参数修改 Watch() 方法的行为。

您可以为 Watch() 方法指定以下选项:

  • ResumeAfter

  • StartAfter

  • FullDocument

  • FullDocumentBeforeChange

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

  • Comment

  • ShowExpandedEvents

  • StartAtOperationTime

  • Custom

  • CustomPipeline

有关这些选项的更多信息,请访问 MongoDB Server手册。

当您对集合执行任何 CRUD 操作时,默认情况下,相应的变更事件文档仅包含该操作修改的字段的增量。通过在 Watch() 方法的 options 参数中指定设置,您可以查看更改前后的完整文档以及增量。

如果要查看文档的后像,即文档更改后的完整版本,请将 options 参数的 FullDocument 字段设置为以下值之一:

  • UpdateLookup:变更事件文档包括整个已变更文档的副本。

  • WhenAvailable:变更事件文档包括变更事件的已修改文档的后像(如果后像可用)。

  • Required:输出与 WhenAvailable 相同,但如果后像不可用,驱动程序会引发服务器端错误。

如果要查看文档的前像,即文档更改前的完整版本,请将 options 参数的 FullDocumentBeforeChange 字段设置为以下值之一:

  • WhenAvailable:如果前像可用,则变更事件文档包括变更事件的已修改文档的前像。

  • Required:输出与 WhenAvailable 的输出相同,但如果前像不可用,驱动程序会引发服务器端错误。

重要

要访问文档前像和后像,您必须为集合启用 changeStreamPreAndPostImages。请参阅 MongoDB Server 手册,获取说明和更多信息。

注意

插入的文档没有前像,已删除的文档没有后像。

以下示例对 courses 集合调用 Watch() 方法。它为 options 参数的 FullDocument 字段指定一个值,以输出整个已修改文档的副本,而不是仅输出已更改字段:

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

使用 "World Fiction"title 将文档的 enrollment 值从 35 更新为 30 会导致以下变更事件:

{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp":
{"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment":
{"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id":
{"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

如果不指定 FullDocument 选项,同一更新操作将不再在变更事件文档中输出 "fullDocument" 值。

有关变更流的可运行示例,请参阅监控数据更改。

有关 变更流 的更多信息,请参阅Change Streams 。

要了解有关 Watch() 方法的更多信息,请访问以下 API 文档链接:

后退

搜索文本