Change Streams
变更流允许应用程序访问实时数据更改,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。
从 MongoDB 5.1 开始,我们对变更流进行了优化,提高了资源利用率,并加快了某些聚合管道阶段的处理速度。
可用性
存储引擎。
副本集和分片集群必须使用 WiredTiger 存储引擎。 Change stream 也可以用于采用 MongoDB 的静态加密功能的部署。
副本集协议版本。
副本集和分片集群必须使用副本集协议版本 1 (
pv1
)。读关注“多数”启用。
无论是否支持
"majority"
读关注, 变更流 都可用;也就是说,可以启用(默认)或majority
禁用 读关注 支持,以使用变更流。
稳定的 API 支持
变更流包含在Stable API V1 中。 但是, Stable APIV 中不包含 showExpandedEvents 选项。1
连接
连接 change stream 可以使用带有+srv
连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。
如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。
有关更多信息,请参阅连接字符串 URI 格式。
监视集合、数据库或部署
可以针对如下情况打开变更流:
目标 | 说明 |
---|---|
集合 | 可以为单个集合(除 本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 |
数据库 | 您可以为单个数据库(不包括 有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 |
部署 | 您可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 |
注意
变更流示例
本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。
变更流性能考量
如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。
分片集群注意事项
在分片集群上打开变更流时:
mongos
在每个分片上创建单独的变更流。无论变更流是否针对特定的分片密钥范围,都会出现这种行为。当
mongos
收到更改流结果时,它会对这些结果进行排序和筛选。如有必要,mongos
还会执行fullDocument
查找。
为获得最佳性能,请在变更流中限制对 $lookup
查询的使用。
打开变更流
要打开变更流:
对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。
对于分片集群,必须从
mongos
中发出打开变更流操作。
以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
以下 C 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory
集合的数据库 。
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
以下 C# 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
以下 Go 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
以下 Java 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
下面的Kotlin示例假设您已连接到MongoDB副本集,并且可以访问权限包含inventory
集合的数据库。 要学习;了解有关完成这些任务的更多信息,请参阅Kotlin协程驱动程序数据库和集合指南。
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
以下示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory
集合的数据库。
cursor = db.inventory.watch() document = await cursor.next()
以下 Node.js 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory
集合的数据库。
以下示例使用流来处理变更事件。
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器处理变更事件:
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
ChangeStream 扩展了 EventEmitter。
以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
以下 Python 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
cursor = db.inventory.watch() next(cursor)
以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
cursor = inventory.watch.to_enum next_change = cursor.next
下面的 Swift (异步) 示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory
集合的数据库。
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
以下 Swift(同步)示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory
集合的数据库。
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。
变更流游标保持打开状态,直到出现以下任一情况:
游标已明确关闭。
发生失效事件;例如删除或重命名集合。
与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为。
如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
注意
未关闭游标的生命周期取决于语言。
[1] | 您可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。 |
修改变更流输出
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
列表包含一个$match
阶段,用于筛选符合以下一个或两个条件的任何操作:
username
值为alice
operationType
值为delete
将 pipeline
传递给 watch()
方法会指示变更流在通过指定的 pipeline
传递通知后返回通知。
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
pipeline
列表包含一个$match
阶段,用于筛选符合以下一个或两个条件的任何操作:
username
值为alice
operationType
值为delete
将 pipeline
传递给 watch()
方法会指示变更流在通过指定的 pipeline
传递通知后返回通知。
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
以下示例使用流来处理变更事件。
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器处理变更事件:
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
查找更新操作的完整文档
默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
要返回已更新文档的当前多数提交版本,请将带有 "updateLookup"
值的 "fullDocument"
选项传递给 mongoc_collection_watch
方法。
在下面的示例中,所有更新操作通知都包含一个 fullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
要返回更新文档的最新多数提交版本,请将 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 FullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
要返回已更新文档的当前多数提交版本,请使用 SetFullDocument(options.UpdateLookup)
变更流选项。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
要返回已更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP
传递给 db.collection.watch.fullDocument()
方法。
在下面的示例中,所有更新操作通知都包含一个 FullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP
传递给 ChangeStreamFlow.fullDocument()方法。
在下面的示例中,所有更新操作通知都包含一个 FullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
要返回更新文档的最新多数提交版本,请将 full_document='updateLookup'
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 `full_document
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' }
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 fullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
以下示例使用流来处理变更事件。
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器处理变更事件:
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
传递给 db.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 fullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
要返回更新文档的最新多数提交版本,请将 full_document='updateLookup'
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 full_document
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup'
传递给 db.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 full_document
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
要返回已更新文档的最新多数提交版本,请将 options:
ChangeStreamOptions(fullDocument: .updateLookup)
传递给 watch()
方法。
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
要返回已更新文档的最新多数提交版本,请将 options:
ChangeStreamOptions(fullDocument: .updateLookup)
传递给 watch()
方法。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
注意
如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。
但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。
如果以下任一条件为真,则更新事件的 fullDocument
字段可能会缺失:
如果文档被删除,或者集合在更新和查找之间被删除。
如果更新更改了该集合分片键中至少一个字段的值。
请参阅变更事件以了解有关变更流响应文档格式的更多信息。
恢复变更流
在打开游标时将恢复令牌指定为 resumeAfter 或 startAfter,借此恢复变更流。
resumeAfter
对于 Change Stream
您可以在打开游标时将恢复令牌传递给 resumeAfter
,从而在特定事件发生后恢复 change stream。
请参阅恢复令牌以了解有关恢复令牌的更多信息。
重要
如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
在下面的示例中,resumeAfter
选项会附加到流选项,以便在流被销毁后重新创建流。将 _id
传递给变更流会尝试在指定的操作之后开始恢复通知。
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
在下面的示例中,resumeToken
是从最后一个变更流文档中检索的,并作为选项传递给 Watch()
方法。将 resumeToken
传递给 Watch()
方法,会指示变更流尝试在恢复令牌中指定的操作之后开始恢复通知。
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
您可以使用ChangeStreamOptions.SetResumeAfter来指定变更流的恢复令牌。如果设置了 resumeAfter 选项,变更流将在恢复令牌中指定的操作后恢复通知。SetResumeAfter
采用的值必须解析为恢复令牌,例如以下示例中的的 resumeToken
。
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
您可以使用 resumeAfter()
方法在恢复令牌中指定的操作后恢复通知。resumeAfter()
方法采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
您可以使用 ChangeStreamFlow.resumeAfter()方法,以便在执行恢复令牌中指定的操作后恢复通知。 resumeAfter()
方法采用的值必须解析为恢复令牌,示例下例中的resumeToken
变量。
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
您可以使用 resume_after
修饰符在恢复令牌中指定的操作后恢复通知。resume_after
修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token
。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 $resumeToken
。
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
您可以使用 resume_after
修饰符在恢复令牌中指定的操作后恢复通知。resume_after
修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token
。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
您可以使用 resume_after
修饰符在恢复令牌中指定的操作后恢复通知。resume_after
修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token
。
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
对于 Change Stream
您可在打开游标时将恢复令牌传递给 startAfter
,从而在特定事件之后启动新的变更流。与 resumeAfter 不同,startAfter
可在出现无效事件之后通过创建新的变更流来恢复通知。
请参阅恢复令牌以了解有关恢复令牌的更多信息。
重要
如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。
恢复令牌
恢复令牌可从多个来源获取:
源 | 说明 |
---|---|
更改事件通知包含针对 _id 字段的恢复词元: | |
该字段仅在使用 | |
getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。 |
从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。
提示
MongoDB 提供了“代码段”,这是 mongosh
的扩展,用于解码十六进制编码的恢复令牌。
您可以从 mongosh
安装并运行 resumetoken:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
如果系统上安装了 npm
,那么您还可以在命令行中运行 resumetoken (并且不使用 mongosh
):
npx mongodb-resumetoken-decoder <RESUME TOKEN>
请参阅以下内容了解详细信息:
从变更事件中恢复令牌
更改事件通知包含针对 _id
字段的恢复令牌:
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
来自以下项的恢复令牌: aggregate
使用 aggregate
命令时,$changeStream
聚合阶段在 cursor.postBatchResumeToken
字段中包含恢复令牌:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
来自以下项的恢复令牌: getMore
getMore
命令还在 cursor.postBatchResumeToken
字段中包含一个恢复令牌:
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
用例
变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。
访问控制
对于在自管理部署上执行身份验证和授权的部署:
要打开针对特定集合的变更流,应用程序必须具有对相应集合授予
changeStream
和find
动作的特权。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非
system
集合授予changeStream
和find
动作的特权。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非
system
集合授予changeStream
和find
动作的特权。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
事件通知
变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。
例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。
如果某个操作与事务相关联,则变更事件文档包括 txnNumber
和 lsid
。
排序规则
除非提供了显式排序规则,否则变更流使用 simple
二进制比较。
变更流和孤立文档
附带文档前映像和后映像的变更流
从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):
前映像是指被替换、更新或删除之前的文档。已插入的文档没有前映像。
后图像是插入、替换或更新后的文档。 已删除的文档没有后图像。
使用
db.createCollection()
、create
或collMod
为集合启用changeStreamPreAndPostImages
。
如果图像属于以下情况,则前像和后像不可用于变更流事件:
在文档更新或删除操作时未对集合启用。
在
expireAfterSeconds
中设置的前像和后像保留时间后之后被删除。以下示例将整个集群上的
expireAfterSeconds
设置为100
秒:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 以下示例返回当前的
changeStreamOptions
设置,包括expireAfterSeconds
:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) 将
expireAfterSeconds
设置为off
可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。如果变更流事件从 oplog 中删除,则无论
expireAfterSeconds
前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。
其他考量:
启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。
将变更流事件大小限制为小于 16 MB。要限制事件大小,您可以:
将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如
updateDescription
)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。如果其他变更流事件字段(例如
updateDescription
)并不大,则仅请求变更流输出中最多 16 MB 的文档的后像。在以下情况下,仅请求最大 16 MB 的文档的 change stream 输出中的预映像:
文档更新仅影响文档结构或内容的一小部分,且
不会引起
replace
变更事件。replace
事件始终包含后像。
要请求前图像,请在
db.collection.watch()
中将fullDocumentBeforeChange
设置为required
或whenAvailable
。要请求后图像,您可以使用相同的方法设置fullDocument
。前像被写入
config.system.preimages
集合。config.system.preimages
集合可能会变大。要限制集合大小,可如前文所示为前映像设置expireAfterSeconds
时间。前像由后台进程异步删除。
重要
向后不兼容的功能
从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod
命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。
有关变更流输出的完整示例,请参阅使用文档前像和后像的变更流。