Docs 菜单
Docs 主页
/ / /
java sync
/

注意更改

您可以通过打开变更流来跟踪 MongoDB 中的数据更改,例如对集群、数据库或部署的更改。变更流允许应用程序监视数据更改并作出响应。

发生更改时,变更流会返回变更事件文档。变更事件包含有关更新后数据的信息。

通过对 MongoCollectionMongoDatabaseMongoClient 对象调用 watch() 方法来打开变更流,如以下代码示例所示:

ChangeStreamIterable<Document> changeStream = database.watch();

watch() 方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数,以筛选和转换变更事件输出,如下所示:

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

watch() 方法返回 ChangeStreamIterable 类的一个实例,这个类提供访问、组织和遍历结果的多种方法。ChangeStreamIterable 还从其父类 MongoIterable 继承方法,该父类实施核心 Java 接口 Iterable

您可以对 ChangeStreamIterable 调用 forEach(),从而在事件发生时对其进行处理,也可以使用 iterator() 方法返回可用于遍历结果的 MongoCursor 实例。

您可以调用 MongoCursor 上的方法,例如调用 hasNext() 来检查是否存在其他结果,调用 next() 来返回集合中的下一份文档,或者调用 tryNext() 来立即返回变更流中的下一个可用元素或 null。与其他查询返回的 MongoCursor 不同,与变更流关联的 MongoCursor 会等到更改事件到达后才从 next() 返回结果。因此,使用变更流的 MongoCursor 调用 next() 永远不会引发 java.util.NoSuchElementException

要配置用于处理从变更流返回的文档的选项,请使用 watch() 返回的 ChangeStreamIterable 对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable API 文档链接。

要从变更流捕获事件,请使用 回调 函数调用 forEach() 方法,如下所示:

changeStream.forEach(event -> System.out.println("Change observed: " + event));

回调 函数在发出更改事件时触发。您可以在 回调 中指定逻辑,以便在收到事件文档时对其进行处理。

重要

forEach() 会阻止当前线程

只要相应的变更流侦听事件,对 forEach() 的调用就会阻止当前线程。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在单独的线程中创建和监听您的变更流。

注意

对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP调用ChangeStreamIterable对象的fullDocument()成员方法,如下所示:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

以下示例使用两个单独的应用程序来演示如何使用变更流侦听更改:

  • 第一个应用程序名为Watch ,在sample_mflix数据库中的movies集合上打开变更流。Watch使用聚合管道来筛选基于operationType的更改,以便它仅接收插入和更新事件(删除被省略排除)。Watch 使用回调来接收和打印集合上发生的筛选更改事件。

  • 第二个应用程序名为 WatchCompanion,它将单个文档插入到 sample_mflix 数据库的 movies 集合中。接下来,WatchCompanion 使用新的字段值更新文档。最后,WatchCompanion 删除该文档。

首先,运行 Watch 以打开集合上的变更流,并使用 forEach() 方法在变更流上定义回调。当 Watch 运行时,运行 WatchCompanion 以通过对集合执行更改来生成变更事件。

注意

此示例使用连接 URI 连接到MongoDB实例。 要学习;了解有关连接到MongoDB实例的更多信息,请参阅连接指南。

Watch:

package usage.examples;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// variables referenced in a lambda must be final; final array gives us a mutable integer
final int[] numberOfEvents = {0};
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

如果您按顺序运行前面的应用程序,则应看到 Watch 应用程序的输出与以下内容类似。仅打印 insertupdate 操作,因为聚合管道会筛选出 delete 操作:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

您还应看到 WatchCompanion 应用程序的输出与以下内容类似:

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

提示

Legacy API

如果您使用的是传统 API,请参阅我们的常见问题页面,了解需要对该代码示例进行哪些更改。

有关此页面上所提及的类和方法的更多信息,请参阅以下资源:

后退

执行批量操作