打开变更流
在此页面上
Overview
在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。
您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。
通过以下部分了解如何打开和配置change stream:
打开变更流
您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。
Select a Scope to Watch
要打开change stream,请在实例watch()
MongoCollection
、MongoDatabase
或 的实例上调用MongoClient
方法。
重要
独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。
The object on which you call the watch()
method on determines the scope of
events that the change stream listens for:
MongoCollection.watch()
monitors a collection.MongoDatabase.watch()
monitors all collections in a database.MongoClient.watch()
monitors all changes in the connected MongoDB deployment.
Filter the Events
The watch()
method takes an optional aggregation pipeline as the first
parameter, which consists of a list of stages that can be used to
filter and transform the change event output, as follows:
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
注意
对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP
调用ChangeStreamIterable
对象的fullDocument()
成员方法,如下所示:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Manage the Output
The watch()
method returns an instance of ChangeStreamIterable
, an interface
that offers several methods to access, organize, and traverse the results.
ChangeStreamIterable
also inherits methods from its parent interface,
MongoIterable
which implements the core Java interface Iterable
.
您可以对 ChangeStreamIterable
调用 forEach()
,从而在事件发生时对其进行处理,也可以使用 iterator()
方法返回可用于遍历结果的 MongoChangeStreamCursor
实例。
You can call the following methods on a MongoChangeStreamCursor
instance:
hasNext()
: Checks if there are more resultsnext()
: Returns the next document in the collectiontryNext()
: Immediately returns either the next available element in the change stream ornull
重要
Iterating the Cursor Blocks the Current Thread
Iterating through a cursor using forEach()
or any iterator()
method
blocks the current thread while the corresponding change stream listens for
events. If your program needs to continue executing other logic, such as
processing requests or responding to user input, consider creating and
listening to your change stream in a separate thread.
Unlike the MongoCursor
returned by other queries, a
MongoChangeStreamCursor
associated with a change stream waits until a change
event arrives before returning a result from next()
. As a result, calls to
next()
using a change stream's MongoChangeStreamCursor
never throw a java.util.NoSuchElementException
.
要配置用于处理从变更流返回的文档的选项,请使用 watch()
返回的 ChangeStreamIterable
对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable
API 文档链接。
例子
myColl
此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。
驱动程序将 change stream 事件存储在类型为ChangeStreamIterable
的变量中。在以下示例中,我们指定驱动程序应使用Document
类型填充ChangeStreamIterable
对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument
对象。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
对collection执行插入操作会产生以下输出:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Watch Example: Full Files
注意
Example Setup
This example connects to an instance of MongoDB by using a
connection URI. To learn more about connecting to your MongoDB
instance, see the 创建 MongoClient guide. This example
also uses the movies
collection in the sample_mflix
database
included in the Atlas sample datasets. You
can load them into your database on the free tier of MongoDB Atlas
by following the Get Started with Atlas Guide.
This example demonstrates how to open a change stream by using the watch method.
The Watch.java
file calls the watch()
method with a pipeline as an
argument to filter for only "insert"
and "update"
events. The
WatchCompanion.java
file inserts, updates and deletes a document.
To use the following examples, run the files in this order:
Run the
Watch.java
file.Run the
WatchCompanion.java
file.
注意
The Watch.java
file will continue running until the
WatchCompanion.java
file is run.
Watch.java
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java
:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
Full File Example Output
The preceding applications will generate the following output:
Watch.java
will capture only the insert
and update
operations, since the aggregation pipeline filters out the delete
operation:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion
will print a summary of the operations it completed:
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
要了解有关watch()
方法的详情,请参阅以下 API 文档:
将聚合操作符应用于change stream
您可以将聚合管道作为参数传递给watch()
方法,以指定change stream接收哪些事件。
要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。
例子
以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
对集合执行更新操作会产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
分割大型change stream事件
从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent
聚合阶段将超过 16 MB 的事件分割成更小的片段。
仅在绝对必要时使用$changeStreamSplitLargeEvent
。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent
。
$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent
每个片段都包括一个包含以下字段的对象:
字段 | 说明 |
---|---|
| 片段的索引,从 |
| 组成分割事件的分片总数 |
以下示例通过使用$changeStreamSplitLargeEvent
聚合阶段分割大型事件来修改change stream:
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
注意
聚合管道中只能有一个$changeStreamSplitLargeEvent
阶段,并且它必须是管道中的最后一个阶段。
您可以在change stream游标上调用getSplitEvent()
方法来访问SplitEvent
,如以下示例所示:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
有关$changeStreamSplitLargeEvent
聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。
包含前像和后像
您可以配置变更事件以包含或省略以下数据:
前像,表示操作前文档版本的文档(如果存在)
帖子-图像,表示操作后文档版本的文档(如果存在)
重要
仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。
要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:
为 MongoDB 部署中的collection启用前像和帖子。
提示
要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流。
要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。
配置你的 change stream 以检索前映像和后映像中的一个或两个。
创建启用前像和后像的集合
要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions
createCollection()
方法,如以下示例所示:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
您可以通过从 MongoDB Shell 运行collMod
命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。
警告
如果在collection上启用了前映像或帖子,则使用collMod
修改这些设置可能会导致该collection上的现有change stream失败。
前像配置示例
以下代码示例展示如何在myColl
collection 上配置 change stream 以包含前映像并输出任何事件:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED
选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。
假设您将文档中amount
字段的值从150
更新为2000
。 此变更事件产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
有关选项列表,请参阅 FullDocumentBeforeChange API 文档。
帖子图像配置示例
以下代码示例展示如何在myColl
collection 上配置 change stream 以包含前映像并输出任何事件:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE
选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。
假设您将文档中color
字段的值从"purple"
更新为"pink"
。 变更事件产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
有关选项列表,请参阅 FullDocument API 文档。
更多信息
API 文档
For more information about the methods and classes used to manage change streams, see the following API documentation: