注意更改
您可以通过打开变更流来跟踪 MongoDB 中的数据更改,例如对集群、数据库或部署的更改。变更流允许应用程序监视数据更改并作出响应。
发生更改时,变更流会返回变更事件文档。变更事件包含有关更新后数据的信息。
通过对 MongoCollection
、MongoDatabase
或 MongoClient
对象调用 watch()
方法来打开变更流,如以下代码示例所示:
ChangeStreamIterable<Document> changeStream = database.watch();
watch()
方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数,以筛选和转换变更事件输出,如下所示:
List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
watch()
方法返回 ChangeStreamIterable
类的一个实例,这个类提供访问、组织和遍历结果的多种方法。ChangeStreamIterable
还从其父类 MongoIterable
继承方法,该父类实施核心 Java 接口 Iterable
。
您可以对 ChangeStreamIterable
调用 forEach()
,从而在事件发生时对其进行处理,也可以使用 iterator()
方法返回可用于遍历结果的 MongoCursor
实例。
您可以调用 MongoCursor
上的方法,例如调用 hasNext()
来检查是否存在其他结果,调用 next()
来返回集合中的下一份文档,或者调用 tryNext()
来立即返回变更流中的下一个可用元素或 null
。与其他查询返回的 MongoCursor
不同,与变更流关联的 MongoCursor
会等到更改事件到达后才从 next()
返回结果。因此,使用变更流的 MongoCursor
调用 next()
永远不会引发 java.util.NoSuchElementException
。
要配置用于处理从变更流返回的文档的选项,请使用 watch()
返回的 ChangeStreamIterable
对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable
API 文档链接。
如何使用回调处理变更流事件
要从变更流捕获事件,请使用 回调 函数调用 forEach()
方法,如下所示:
changeStream.forEach(event -> System.out.println("Change observed: " + event));
回调 函数在发出更改事件时触发。您可以在 回调 中指定逻辑,以便在收到事件文档时对其进行处理。
重要
forEach() 会阻止当前线程
只要相应的变更流侦听事件,对 forEach()
的调用就会阻止当前线程。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在单独的线程中创建和监听您的变更流。
注意
对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP
调用ChangeStreamIterable
对象的fullDocument()
成员方法,如下所示:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
例子
以下示例使用两个单独的应用程序来演示如何使用变更流侦听更改:
第一个应用程序名为
Watch
,在sample_mflix
数据库中的movies
集合上打开变更流。Watch
使用聚合管道来筛选基于operationType
的更改,以便它仅接收插入和更新事件(删除被省略排除)。Watch
使用回调来接收和打印集合上发生的筛选更改事件。第二个应用程序名为
WatchCompanion
,它将单个文档插入到sample_mflix
数据库的movies
集合中。接下来,WatchCompanion
使用新的字段值更新文档。最后,WatchCompanion
删除该文档。
首先,运行 Watch
以打开集合上的变更流,并使用 forEach()
方法在变更流上定义回调。当 Watch
运行时,运行 WatchCompanion
以通过对集合执行更改来生成变更事件。
注意
此示例使用连接 URI 连接到MongoDB实例。 要学习;了解有关连接到MongoDB实例的更多信息,请参阅连接指南。
Watch
:
package usage.examples; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); // variables referenced in a lambda must be final; final array gives us a mutable integer final int[] numberOfEvents = {0}; changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion
:
package usage.examples; import java.util.Arrays; import org.bson.Document; import org.bson.types.ObjectId; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Success! Inserted document id: " + insertResult.getInsertedId()); UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
如果您按顺序运行前面的应用程序,则应看到 Watch
应用程序的输出与以下内容类似。仅打印 insert
和 update
操作,因为聚合管道会筛选出 delete
操作:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
您还应看到 WatchCompanion
应用程序的输出与以下内容类似:
Success! Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
有关此页面上所提及的类和方法的更多信息,请参阅以下资源:
变更流服务器手动条目
变更事件服务器手册条目
聚合管道服务器手册条目
聚合阶段服务器手册条目
ChangeStreamIterable API文档
MongoCollection.watch() API 文档
MongoDatabase.watch() API 文档
MongoClient.watch() API 文档