Docs 菜单
Docs 主页
/ / /
Java (Sync) 驱动程序
/ / /

打开变更流

在此页面上

  • Overview
  • 打开变更流
  • 将聚合操作符应用于change stream
  • 分割大型change stream事件
  • 包含前像和后像

在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。

您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。

通过以下部分了解如何打开和配置change stream:

  • 打开变更流

  • 将聚合操作符应用于change stream

  • 包含前像和后像

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在实例上调用watch() MongoCollectionMongoDatabaseMongoClient方法。

重要

独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。

您对其调用watch()方法的对象决定了change stream侦听的事件范围。

如果您对MongoCollection调用watch() ,则change stream会监控一个collection。

如果您对MongoDatabase调用watch() ,则change stream将监控该数据库中的所有collection。

如果您对MongoClient调用watch() ,则change stream会监控已连接 MongoDB 部署中的所有变更。

myColl此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。

驱动程序将 change stream 事件存储在类型为ChangeStreamIterable的变量中。在以下示例中,我们指定驱动程序应使用Document类型填充ChangeStreamIterable对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument对象。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

对collection执行插入操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

有关可运行的示例,请参阅监视变更用法示例页面。

要了解有关watch()方法的详情,请参阅以下 API 文档:

您可以将聚合管道作为参数传递给watch()方法,以指定change stream接收哪些事件。

要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。

以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

对集合执行更新操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent聚合阶段将超过 16 MB 的事件分割成更小的片段。

仅在绝对必要时使用$changeStreamSplitLargeEvent 。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent

$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent每个片段都包括一个包含以下字段的对象:

字段
说明
fragment
片段的索引,起始于 1
of
组成分割事件的分片总数

以下示例通过使用$changeStreamSplitLargeEvent聚合阶段分割大型事件来修改change stream:

ChangeStreamIterable<Document> changeStream = collection.watch(
Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

注意

聚合管道中只能有一个$changeStreamSplitLargeEvent阶段,并且它必须是管道中的最后一个阶段。

您可以在change stream游标上调用getSplitEvent()方法来访问SplitEvent ,如以下示例所示:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

有关$changeStreamSplitLargeEvent聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。

您可以配置变更事件以包含或省略以下数据:

  • 前像,表示操作前文档版本的文档(如果存在)

  • 帖子-图像,表示操作后文档版本的文档(如果存在)

重要

仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。

要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。

  • 配置你的 change stream 以检索前映像和后映像中的一个或两个。

    提示

    要配置变更流以记录变更事件中的前像,请参阅前像配置示例。

    要配置变更流以记录变更事件中的后映像,请参阅后映像配置示例。

要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions createCollection()方法,如以下示例所示:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

您可以通过从 MongoDB Shell 运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。

警告

如果在collection上启用了前映像或帖子,则使用collMod修改这些设置可能会导致该collection上的现有change stream失败。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。

假设您将文档中amount字段的值从150更新为2000 。 此变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

有关选项列表,请参阅 FullDocumentBeforeChange API 文档。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。

假设您将文档中color字段的值从"purple"更新为"pink" 。 变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

有关选项列表,请参阅 FullDocument API 文档。

后退

从游标访问数据