Docs 菜单
Docs 主页
/
MongoDB Manual

Change Streams

在此页面上

  • 可用性
  • 连接
  • 监视集合、数据库或部署
  • 变更流性能考量
  • 打开变更流
  • 修改变更流输出
  • 查找更新操作的完整文档
  • 恢复变更流
  • 用例
  • 访问控制
  • 事件通知
  • 排序规则

变更流允许应用程序访问实时数据更改,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。

变更流可用于副本集分片集群

变更流包含在Stable API V1 中。

连接 change stream 可以使用带有+srv连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。

如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。

有关更多信息,请参阅连接字符串 URI 格式。

可以针对如下情况打开变更流:

目标
说明

集合

可以为单个集合(除 system 集合,adminlocalconfig 数据库中的任何集合)打开变更流游标。

本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 mongosh 方法 db.collection.watch()

数据库

您可以为单个数据库(不包括 adminlocalconfig 数据库)打开变更流游标,以监视其所有非系统集合的更改。

有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 mongosh 方法 db.watch()

部署

您可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 adminlocalconfig 外)中对所有非系统集合的变更。

有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 mongosh 方法 Mongo.watch()

注意

变更流示例

本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。

如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。

要打开变更流:

  • 对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。

  • 对于分片集群,必须从 mongos 中发出打开变更流操作。

以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


下面的C示例假定您已连接到MongoDB副本集并访问了包含 inventory集合的数据库。

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

以下C#示例假定您已连接到MongoDB副本集并访问了包含 集合的数据库 inventory

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

以下 Go 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

以下 Java 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

以下示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory 集合的数据库

cursor = db.inventory.watch()
document = await cursor.next()

以下 Node.js 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory 集合的数据库

以下示例使用流来处理变更事件。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器处理变更事件:

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

以下 Python 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cursor = db.inventory.watch()
next(cursor)

以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cursor = inventory.watch.to_enum
next_change = cursor.next

下面的 Swift (异步) 示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory 集合的数据库

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

以下 Swift(同步)示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory 集合的数据库

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。

变更流游标保持打开状态,直到出现以下任一情况:

  • 游标已明确关闭。

  • 发生失效事件;例如删除或重命名集合。

  • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

注意

未关闭游标的生命周期取决于语言。

[1] 您可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。

➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipeline列表包括一个$match阶段,用于过滤usernamealice的任何操作或operationTypedelete的操作。

pipeline 传递给 watch() 方法会指示变更流在通过指定的 pipeline 传递通知后返回通知。

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

以下示例使用流来处理变更事件。

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器处理变更事件:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

提示

变更流事件文档的_id字段充当恢复令牌。 请勿使用管道修改或删除变更流事件的_id字段。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


要返回已更新文档的当前多数提交版本,请将带有 "updateLookup" 值的 "fullDocument" 选项传递给 mongoc_collection_watch 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

要返回更新文档的最新多数提交版本,请将 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

要返回已更新文档的当前多数提交版本,请使用 SetFullDocument(options.UpdateLookup) 变更流选项。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

要返回已更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP 传递给 db.collection.watch.fullDocument() 方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 `full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' } 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

以下示例使用流来处理变更事件。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器处理变更事件:

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup' 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

要返回已更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

要返回已更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

注意

如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。

但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

在打开游标时将恢复令牌指定为 resumeAfter startAfter,借此恢复变更流。

您可以在打开游标时将恢复令牌传递给 resumeAfter ,从而在特定事件发生后恢复 change stream。

请参阅恢复令牌以了解有关恢复令牌的更多信息。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

在下面的示例中,resumeAfter 选项会附加到流选项,以便在流被销毁后重新创建流。将 _id 传递给变更流会尝试在指定的操作之后开始恢复通知。

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

在下面的示例中,resumeToken 是从最后一个变更流文档中检索的,并作为选项传递给 Watch() 方法。将 resumeToken 传递给 Watch() 方法,会指示变更流尝试在恢复令牌中指定的操作之后开始恢复通知。

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

您可以使用ChangeStreamOptions.SetResumeAfter来指定变更流的恢复令牌。如果设置了 resumeAfter 选项,变更流将在恢复令牌中指定的操作后恢复通知。SetResumeAfter 采用的值必须解析为恢复令牌,例如以下示例中的的 resumeToken

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

您可以使用 resumeAfter() 方法在恢复令牌中指定的操作后恢复通知。resumeAfter() 方法采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 $resumeToken

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

您可在打开游标时将恢复令牌传递给 startAfter,从而在特定事件之后启动新的变更流。与 resumeAfter 不同,startAfter 可在出现无效事件之后通过创建新的变更流来恢复通知。

请参阅恢复令牌以了解有关恢复令牌的更多信息。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

恢复令牌可从多个来源获取:

说明

更改事件通知包含针对 _id 字段的恢复词元:

$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌。

该字段仅在使用 aggregate 命令时显示。

getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

更改事件通知包含针对 _id 字段的恢复令牌:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

使用 aggregate 命令时,$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 命令还在 cursor.postBatchResumeToken 字段中包含一个恢复令牌:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。

对于在自管理部署上执行身份验证授权的部署:

  • 要打开针对特定集合的变更流,应用程序必须具有对相应集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。

例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。

如果某个操作与事务相关联,则变更事件文档包括 txnNumberlsid

除非提供了显式排序规则,否则变更流使用 simple 二进制比较。

后退

时间序列