Docs 菜单

打开变更流

在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。

您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。

通过以下部分了解如何打开和配置change stream:

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在实例watch() MongoCollectionMongoDatabase 或 的实例上调用MongoClient 方法。

重要

独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。

The object on which you call the watch() method on determines the scope of events that the change stream listens for:

  • MongoCollection.watch() monitors a collection.

  • MongoDatabase.watch() monitors all collections in a database.

  • MongoClient.watch() monitors all changes in the connected MongoDB deployment.

The watch() method takes an optional aggregation pipeline as the first parameter, which consists of a list of stages that can be used to filter and transform the change event output, as follows:

List<Bson> pipeline = List.of(
Aggregates.match(
Filters.in("operationType",
List.of("insert", "update"))),
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

注意

对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP调用ChangeStreamIterable对象的fullDocument()成员方法,如下所示:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

The watch() method returns an instance of ChangeStreamIterable, an interface that offers several methods to access, organize, and traverse the results. ChangeStreamIterable also inherits methods from its parent interface, MongoIterable which implements the core Java interface Iterable.

您可以对 ChangeStreamIterable 调用 forEach(),从而在事件发生时对其进行处理,也可以使用 iterator() 方法返回可用于遍历结果的 MongoChangeStreamCursor 实例。

You can call the following methods on a MongoChangeStreamCursor instance:

  • hasNext(): Checks if there are more results

  • next(): Returns the next document in the collection

  • tryNext(): Immediately returns either the next available element in the change stream or null

重要

Iterating the Cursor Blocks the Current Thread

Iterating through a cursor using forEach() or any iterator() method blocks the current thread while the corresponding change stream listens for events. If your program needs to continue executing other logic, such as processing requests or responding to user input, consider creating and listening to your change stream in a separate thread.

Unlike the MongoCursor returned by other queries, a MongoChangeStreamCursor associated with a change stream waits until a change event arrives before returning a result from next(). As a result, calls to next() using a change stream's MongoChangeStreamCursor never throw a java.util.NoSuchElementException.

要配置用于处理从变更流返回的文档的选项,请使用 watch() 返回的 ChangeStreamIterable 对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable API 文档链接。

myColl此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。

驱动程序将 change stream 事件存储在类型为ChangeStreamIterable的变量中。在以下示例中,我们指定驱动程序应使用Document类型填充ChangeStreamIterable对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument对象。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

对collection执行插入操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

注意

Example Setup

This example connects to an instance of MongoDB by using a connection URI. To learn more about connecting to your MongoDB instance, see the 创建 MongoClient guide. This example also uses the movies collection in the sample_mflix database included in the Atlas sample datasets. You can load them into your database on the free tier of MongoDB Atlas by following the Get Started with Atlas Guide.

This example demonstrates how to open a change stream by using the watch method. The Watch.java file calls the watch() method with a pipeline as an argument to filter for only "insert" and "update" events. The WatchCompanion.java file inserts, updates and deletes a document.

To use the following examples, run the files in this order:

  1. Run the Watch.java file.

  2. Run the WatchCompanion.java file.

注意

The Watch.java file will continue running until the WatchCompanion.java file is run.

Watch.java:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package org.example;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.Aggregates;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion.java:

// Performs CRUD operations to generate change events when run with the Watch application
package org.example;
import org.bson.Document;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.result.DeleteResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

The preceding applications will generate the following output:

Watch.java will capture only the insert and update operations, since the aggregation pipeline filters out the delete operation:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

WatchCompanion will print a summary of the operations it completed:

Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

要了解有关watch()方法的详情,请参阅以下 API 文档:

您可以将聚合管道作为参数传递给watch()方法,以指定change stream接收哪些事件。

要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。

以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

对集合执行更新操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent聚合阶段将超过 16 MB 的事件分割成更小的片段。

仅在绝对必要时使用$changeStreamSplitLargeEvent 。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent

$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent每个片段都包括一个包含以下字段的对象:

字段
说明

fragment

片段的索引,从1开始

of

组成分割事件的分片总数

以下示例通过使用$changeStreamSplitLargeEvent聚合阶段分割大型事件来修改change stream:

ChangeStreamIterable<Document> changeStream = collection.watch(
List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

注意

聚合管道中只能有一个$changeStreamSplitLargeEvent阶段,并且它必须是管道中的最后一个阶段。

您可以在change stream游标上调用getSplitEvent()方法来访问SplitEvent ,如以下示例所示:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

有关$changeStreamSplitLargeEvent聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。

您可以配置变更事件以包含或省略以下数据:

  • 前像,表示操作前文档版本的文档(如果存在)

  • 帖子-图像,表示操作后文档版本的文档(如果存在)

重要

仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。

要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。

  • 配置你的 change stream 以检索前映像和后映像中的一个或两个。

    提示

    要配置变更流以记录变更事件中的前像,请参阅前像配置示例。

    要配置变更流以记录变更事件中的后映像,请参阅后映像配置示例。

要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions createCollection()方法,如以下示例所示:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

您可以通过从 MongoDB Shell 运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。

警告

如果在collection上启用了前映像或帖子,则使用collMod修改这些设置可能会导致该collection上的现有change stream失败。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。

假设您将文档中amount字段的值从150更新为2000 。 此变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

有关选项列表,请参阅 FullDocumentBeforeChange API 文档。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。

假设您将文档中color字段的值从"purple"更新为"pink" 。 变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

有关选项列表,请参阅 FullDocument API 文档。

For more information about the methods and classes used to manage change streams, see the following API documentation: