Change Streams
MongoDB 3.6引入了 $changeStream
聚合管道操作符。
变更流提供了一种监视集合中文档变更的方法。 为了提高此新阶段的可用性, MongoCollection
类型包含新的watch()
方法。 ChangeStreamPublisher
实例会设置变更流,并在遇到可能可恢复的错误时自动尝试恢复。
先决条件
您必须设置以下组件才能运行本指南中的代码示例:
一个
test.restaurants
restaurants.json
集合,其中填充了来自 文档资产Github 中的 文件的文档。以下 import 语句:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document;
重要
本指南使用Subscriber
实现,如快速入门入门知识中所述。
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义MongoDatabase
和MongoCollection
实例。
以下代码连接到在端口27017
上的localhost
上运行的独立 MongoDB 部署。 然后,定义database
变量以引用test
数据库,并collection
变量以引用restaurants
集合:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。
监视集合的更改
要创建变更流,请使用MongoCollection.watch()
方法之一。
在以下示例中,变更流会打印出其观察到的所有更改:
collection.watch().subscribe(new PrintDocumentSubscriber());
监视数据库更改
应用程序可以打开单个变更流来监视数据库的所有非系统集合。 要创建此类变更流,请使用MongoDatabase.watch()
方法之一。
在以下示例中,变更流会打印出它在给定数据库上观察到的所有变更:
database.watch().subscribe(new PrintDocumentSubscriber());
监视所有数据库的更改
应用程序可以打开单个变更流来监视 MongoDB 部署中所有数据库的所有非系统集合。 要创建此类变更流,请使用MongoClient.watch()
方法之一。
在以下示例中,变更流打印出它在MongoClient
连接的部署中观察到的所有更改:
client.watch().subscribe(new PrintDocumentSubscriber());
过滤内容
您可以向watch()
方法传递聚合阶段列表,以修改$changeStream
操作符返回的数据。
注意
并非所有聚合操作符都受支持。 请参阅Change Streams MongoDB Server手册中的 以了解更多信息。
在以下示例中,变更流会打印出其观察到的与insert
、 update
、 replace
和delete
操作相对应的所有更改。
首先,管道包括一个$match
阶段,用于过滤operationType
为insert
、 update
、 replace
或delete
的文档。 然后,它将fullDocument
设置为FullDocument.UPDATE_LOOKUP
,以便更新后的文档包含在结果中:
collection.watch( asList( Aggregates.match( Filters.in("operationType", asList("insert", "update", "replace", "delete")) ) ) ).fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new PrintDocumentSubscriber());