Change Streams
变更流允许应用程序访问实时数据更改,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。
可用性
存储引擎。
副本集和分片集群必须使用 WiredTiger 存储引擎。 Change stream 也可以用于采用 MongoDB 的静态加密功能的部署。
副本集协议版本。
副本集和分片集群必须使用副本集协议版本 1 (
pv1
)。读关注“多数”启用。
从 MongoDB 4.2 开始,无论是否有
"majority"
读关注支持,change stream 都可用;也就是说,读关注majority
支持可以启用(默认),也可以禁用以使用 change stream。在 MongoDB 4.0 及更早版本中,变更流仅在系统启用
"majority"
读关注支持(默认)的情况下才可用。
稳定的 API 支持
变更流包含在Stable API V1 中。
连接
连接 change stream 可以使用带有+srv
连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。
如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。
有关更多信息,请参阅连接字符串 URI 格式。
监视集合、数据库或部署
可以针对如下情况打开变更流:
目标 | 说明 |
---|---|
集合 | 可以为单个集合(除 本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 |
数据库 | 您可以为单个数据库(不包括 有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 |
部署 | 您可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 |
注意
变更流示例
本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。
变更流性能考量
如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。
打开变更流
要打开变更流:
对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。
对于分片集群,必须从
mongos
中发出打开变更流操作。
以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
以下 C 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory
集合的数据库 。
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
以下 C# 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
以下 Go 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
以下 Java 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
以下示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory
集合的数据库。
cursor = db.inventory.watch() document = await cursor.next()
以下 Node.js 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory
集合的数据库。
以下示例使用流来处理变更事件。
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器处理变更事件:
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
以下 Python 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
cursor = db.inventory.watch() next(cursor)
以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory
集合的数据库。
cursor = inventory.watch.to_enum next_change = cursor.next
下面的 Swift (异步) 示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory
集合的数据库。
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
以下 Swift(同步)示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory
集合的数据库。
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。
变更流游标保持打开状态,直到出现以下任一情况:
游标已明确关闭。
发生失效事件;例如删除或重命名集合。
与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为。
如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。
注意
未关闭游标的生命周期取决于语言。
[1] | 您可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。 |
修改变更流输出
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipeline
列表包括一个$match
阶段,用于过滤username
为alice
的任何操作或operationType
为delete
的操作。
将 pipeline
传递给 watch()
方法会指示变更流在通过指定的 pipeline
传递通知后返回通知。
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
以下示例使用流来处理变更事件。
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器处理变更事件:
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
查找更新操作的完整文档
默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
要返回已更新文档的当前多数提交版本,请将带有 "updateLookup"
值的 "fullDocument"
选项传递给 mongoc_collection_watch
方法。
在下面的示例中,所有更新操作通知都包含一个 fullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
要返回更新文档的最新多数提交版本,请将 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 FullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
要返回已更新文档的当前多数提交版本,请使用 SetFullDocument(options.UpdateLookup)
变更流选项。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
要返回已更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP
传递给 db.collection.watch.fullDocument()
方法。
在下面的示例中,所有更新操作通知都包含一个 FullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
要返回更新文档的最新多数提交版本,请将 full_document='updateLookup'
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 `full_document
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' }
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 fullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
以下示例使用流来处理变更事件。
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器处理变更事件:
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
传递给 db.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 fullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
要返回更新文档的最新多数提交版本,请将 full_document='updateLookup'
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 full_document
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup'
传递给 db.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个 full_document
字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
要返回已更新文档的最新多数提交版本,请将 options:
ChangeStreamOptions(fullDocument: .updateLookup)
传递给 watch()
方法。
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
要返回已更新文档的最新多数提交版本,请将 options:
ChangeStreamOptions(fullDocument: .updateLookup)
传递给 watch()
方法。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
注意
如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。
但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。
请参阅变更事件以了解有关变更流响应文档格式的更多信息。
恢复变更流
在打开游标时将恢复令牌指定为 resumeAfter 或 startAfter,借此恢复变更流。
resumeAfter
对于 Change Stream
您可以在打开游标时将恢复令牌传递给 resumeAfter
,从而在特定事件发生后恢复 change stream。
请参阅恢复令牌以了解有关恢复令牌的更多信息。
重要
如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
在下面的示例中,resumeAfter
选项会附加到流选项,以便在流被销毁后重新创建流。将 _id
传递给变更流会尝试在指定的操作之后开始恢复通知。
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
在下面的示例中,resumeToken
是从最后一个变更流文档中检索的,并作为选项传递给 Watch()
方法。将 resumeToken
传递给 Watch()
方法,会指示变更流尝试在恢复令牌中指定的操作之后开始恢复通知。
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
您可以使用ChangeStreamOptions.SetResumeAfter来指定变更流的恢复令牌。如果设置了 resumeAfter 选项,变更流将在恢复令牌中指定的操作后恢复通知。SetResumeAfter
采用的值必须解析为恢复令牌,例如以下示例中的的 resumeToken
。
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
您可以使用 resumeAfter()
方法在恢复令牌中指定的操作后恢复通知。resumeAfter()
方法采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
您可以使用 resume_after
修饰符在恢复令牌中指定的操作后恢复通知。resume_after
修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token
。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 $resumeToken
。
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
您可以使用 resume_after
修饰符在恢复令牌中指定的操作后恢复通知。resume_after
修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token
。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
您可以使用 resume_after
修饰符在恢复令牌中指定的操作后恢复通知。resume_after
修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token
。
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
您可以使用 resumeAfter
选项在恢复令牌中指定的操作后恢复通知。resumeAfter
选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken
。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
对于 Change Stream
您可在打开游标时将恢复令牌传递给 startAfter
,从而在特定事件之后启动新的变更流。与 resumeAfter 不同,startAfter
可在出现无效事件之后通过创建新的变更流来恢复通知。
请参阅恢复令牌以了解有关恢复令牌的更多信息。
重要
如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。
恢复令牌
恢复令牌可从多个来源获取:
源 | 说明 |
---|---|
更改事件通知包含针对 _id 字段的恢复词元: | |
该字段仅在使用 | |
getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。 |
从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。
从变更事件中恢复令牌
更改事件通知包含针对 _id
字段的恢复令牌:
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
来自以下项的恢复令牌: aggregate
使用 aggregate
命令时,$changeStream
聚合阶段在 cursor.postBatchResumeToken
字段中包含恢复令牌:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
来自以下项的恢复令牌: getMore
getMore
命令还在 cursor.postBatchResumeToken
字段中包含一个恢复令牌:
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
用例
变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。
访问控制
对于在自管理部署上执行身份验证和授权的部署:
要打开针对特定集合的变更流,应用程序必须具有对相应集合授予
changeStream
和find
动作的特权。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非
system
集合授予changeStream
和find
动作的特权。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非
system
集合授予changeStream
和find
动作的特权。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
事件通知
变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。
例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。
如果某个操作与事务相关联,则变更事件文档包括 txnNumber
和 lsid
。
排序规则
除非提供了显式排序规则,否则变更流使用 simple
二进制比较。