Docs 菜单
Docs 主页
/ / /
Java Reactive Streams 驱动程序
/

监控数据变化

在此页面上

  • Overview
  • 样本数据
  • 打开变更流
  • 修改变更流输出
  • 修改 watch()行为
  • 包含前像和后像
  • 更多信息
  • API 文档

在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是MongoDB Server的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。

本指南中的示例使用 Atlas示例数据集中 sample_restaurants.restaurants集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门指南。

重要

项目 Reactor 库

本指南使用 Project ReactorPublisher 库来使用Java Reactive Streams驾驶员方法返回的 实例。要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 Reactor 文档中的“入门” 。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB ”指南。

要打开变更流,请调用watch()方法。 调用该方法的实例决定了变更流侦听的事件范围。 您可以对以下类的实例调用watch()方法:

  • MongoClient:监控 MongoDB 部署中的所有更改

  • MongoDatabase:监控数据库中所有集合的变更

  • MongoCollection:监控集合中的更改

以下示例在restaurants集合上打开变更流,并在发生变更时输出变更:

// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch();
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要开始监视更改,请运行应用程序。 然后,在单独的应用程序或shell中对 restaurants集合执行写入操作。 更新"name"字段值为"Blarney Castle"的文档会产生以下变更流输出:

Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null,
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{value=...}}

您可以将聚合管道作为参数传递给watch()方法,以修改变更流输出。 此参数允许您仅监视指定的变更事件。

您可以在pipeline参数中指定以下聚合阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

以下示例将聚合管道传递给变更流以仅记录更新操作:

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。

您可以将方法链接到watch()方法,这些方法表示可用于配置变更流操作的选项。 如果不指定任何选项,驾驶员不会自定义操作。

下表描述了可以链接到watch()以自定义其行为的方法:

选项
说明
fullDocument()
Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.
fullDocumentBeforeChange()
Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.
resumeAfter()
Directs watch() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
resumeAfter() is mutually exclusive with startAfter() and startAtOperationTime().
startAfter()
Directs watch() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
startAfter() is mutually exclusive with resumeAfter() and startAtOperationTime().
startAtOperationTime()
Directs watch() to return only events that occur after the specified timestamp.
startAtOperationTime() is mutually exclusive with resumeAfter() and startAfter().
maxAwaitTime()
Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.
showExpandedEvents()
Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, call this method and pass in the value, true.
batchSize()
Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.
collation()
Specifies the collation to use for the change stream cursor.
comment()
Attaches a comment to the operation.

重要

仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。

默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请将fullDocumentBeforeChange()fullDocument()方法链接到watch()方法。

前像是文档在更改之前的完整版本。 要将前像包含在变更流事件中,请将以下值之一传递给fullDocumentBeforeChange()方法:

  • FullDocumentBeforeChange.WHEN_AVAILABLE:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。

  • FullDocumentBeforeChange.REQUIRED:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。

后像是文档更改的完整版本。 要将后图像包含在变更流事件中,请将以下值之一传递给fullDocument()方法:

  • FullDocument.UPDATE_LOOKUP:更改事件包括更改后某个时间点的整个已更改文档的副本。

  • FullDocument.WHEN_AVAILABLE:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。

  • FullDocument.REQUIRED:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。

以下示例在集合上打开变更流,并通过将fullDocument()方法链接到watch()方法来包含更新文档的后映像:

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received including the full
// document after the update
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。

要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档:

后退

地理空间Atlas Search