Docs 菜单
Docs 主页
/ / /
java sync
/ / /

打开变更流

在此页面上

  • Overview
  • 打开变更流
  • 将聚合操作符应用于change stream
  • 包含前像和后像

在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。

您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。

通过以下部分了解如何打开和配置change stream:

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在实例上调用watch() MongoCollectionMongoDatabaseMongoClient方法。

重要

独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。

您对其调用watch()方法的对象决定了change stream侦听的事件范围。

如果您对MongoCollection调用watch() ,则change stream会监控一个collection。

如果您对MongoDatabase调用watch() ,则change stream将监控该数据库中的所有collection。

如果您对MongoClient调用watch() ,则change stream会监控已连接 MongoDB 部署中的所有变更。

myColl此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。

驱动程序将 change stream 事件存储在类型为ChangeStreamIterable的变量中。在以下示例中,我们指定驱动程序应使用Document类型填充ChangeStreamIterable对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument对象。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

对collection执行插入操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

有关可运行的示例,请参阅监视变更用法示例页面。

要了解有关watch()方法的详情,请参阅以下 API 文档:

您可以将聚合管道作为参数传递给watch()方法,以指定change stream接收哪些事件。

要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。

以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

对集合执行更新操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

您可以配置变更事件以包含或省略以下数据:

  • 前像,表示操作前文档版本的文档(如果存在)

  • 帖子-图像,表示操作后文档版本的文档(如果存在)

重要

仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。

要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。

  • 配置你的 change stream 以检索前映像和后映像中的一个或两个。

    提示

    要配置变更流以记录变更事件中的前像,请参阅前像配置示例。

    要配置变更流以记录变更事件中的后映像,请参阅后映像配置示例。

要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions createCollection()方法,如以下示例所示:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

您可以通过从 MongoDB Shell 运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。

警告

如果在collection上启用了前映像或帖子,则使用collMod修改这些设置可能会导致该collection上的现有change stream失败。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。

假设您将文档中amount字段的值从150更新为2000 。 此变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

有关选项列表,请参阅 FullDocumentBeforeChange API 文档。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。

假设您将文档中color字段的值从"purple"更新为"pink" 。 变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

有关选项列表,请参阅 FullDocument API 文档。

后退

从游标访问数据