Docs 菜单

Change Streams

变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。

从 MongoDB 5.1 开始,我们对变更流进行了优化,提高了资源利用率,并加快了某些聚合管道阶段的处理速度。

变更流可用于副本集分片集群

变更流包含在Stable API V1 中。 但是, Stable APIV 中不包含 showExpandedEvents 选项。1

连接 change stream 可以使用带有+srv连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。

如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。

有关更多信息,请参阅连接字符串 URI 格式。

可以针对如下情况打开变更流:

目标
说明

集合

可以为单个集合(除 system 集合,adminlocalconfig 数据库中的任何集合)打开变更流游标。

本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 mongosh 方法 db.collection.watch()

数据库

您可以为单个数据库(不包括 adminlocalconfig 数据库)打开变更流游标,以监视其所有非系统集合的更改。

有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 mongosh 方法 db.watch()

部署

您可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 adminlocalconfig 外)中对所有非系统集合的变更。

有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 mongosh 方法 Mongo.watch()

注意

变更流示例

本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。

如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。

在分片集群上打开变更流时:

  • mongos每个分片上创建单独的变更流。无论变更流是否针对特定的分片密钥范围,都会出现这种行为。

  • mongos 收到更改流结果时,它会对这些结果进行排序和筛选。如有必要,mongos 还会执行 fullDocument 查找。

为获得最佳性能,请在变更流中限制对 $lookup 查询的使用。

要打开变更流:

  • 对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。

  • 对于分片集群,必须从 mongos 中发出打开变更流操作。

以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


下面的C示例假定您已连接到MongoDB副本集并访问了包含 inventory集合的数据库。

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

以下 C# 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

以下 Go 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

以下 Java 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

下面的Kotlin示例假设您已连接到MongoDB副本集,并且可以访问权限包含inventory集合的数据库。 要学习;了解有关完成这些任务的更多信息,请参阅Kotlin协程驱动程序数据库和集合指南。

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

以下示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory 集合的数据库

cursor = db.inventory.watch()
document = await cursor.next()

以下 Node.js 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory 集合的数据库

以下示例使用流来处理变更事件。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器处理变更事件:

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream 扩展了 EventEmitter

以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

以下 Python 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cursor = db.inventory.watch()
next(cursor)

以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cursor = inventory.watch.to_enum
next_change = cursor.next

下面的 Swift (异步) 示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory 集合的数据库

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

以下 Swift(同步)示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory 集合的数据库

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。

变更流游标保持打开状态,直到出现以下任一情况:

  • 游标已明确关闭。

  • 发生失效事件;例如删除或重命名集合。

  • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。

注意

未关闭游标的生命周期取决于语言。

[1] 您可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。

➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipeline列表包含一个$match阶段,用于筛选符合以下一个或两个条件的任何操作:

  • username值为alice

  • operationType值为delete

pipeline 传递给 watch() 方法会指示变更流在通过指定的 pipeline 传递通知后返回通知。

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

pipeline列表包含一个$match阶段,用于筛选符合以下一个或两个条件的任何操作:

  • username值为alice

  • operationType值为delete

pipeline 传递给 watch() 方法会指示变更流在通过指定的 pipeline 传递通知后返回通知。

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

以下示例使用流来处理变更事件。

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器处理变更事件:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

提示

变更流事件文档的_id字段充当恢复令牌。 请勿使用管道修改或删除变更流事件的_id字段。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


要返回已更新文档的当前多数提交版本,请将带有 "updateLookup" 值的 "fullDocument" 选项传递给 mongoc_collection_watch 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

要返回更新文档的最新多数提交版本,请将 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

要返回已更新文档的当前多数提交版本,请使用 SetFullDocument(options.UpdateLookup) 变更流选项。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

要返回更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP 传递给 db.collection.watch.fullDocument() 方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP 传递给 ChangeStreamFlow.fullDocument()方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 `full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' } 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

以下示例使用流来处理变更事件。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器处理变更事件:

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup' 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

要返回已更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

要返回已更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

注意

如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。

但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。

如果以下任一条件为真,则更新事件的 fullDocument 字段可能会缺失:

  • 如果文档被删除,或者集合在更新和查找之间被删除。

  • 如果更新更改了该集合分片键中至少一个字段的值。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

在打开游标时将恢复令牌指定为 resumeAfter startAfter,借此恢复变更流。

您可以在打开游标时将恢复令牌传递给 resumeAfter ,从而在特定事件发生后恢复 change stream。

请参阅恢复令牌以了解有关恢复令牌的更多信息。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

在下面的示例中,resumeAfter 选项会附加到流选项,以便在流被销毁后重新创建流。将 _id 传递给变更流会尝试在指定的操作之后开始恢复通知。

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

在下面的示例中,resumeToken 是从最后一个变更流文档中检索的,并作为选项传递给 Watch() 方法。将 resumeToken 传递给 Watch() 方法,会指示变更流尝试在恢复令牌中指定的操作之后开始恢复通知。

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

您可以使用ChangeStreamOptions.SetResumeAfter来指定变更流的恢复令牌。如果设置了 resumeAfter 选项,变更流将在恢复令牌中指定的操作后恢复通知。SetResumeAfter 采用的值必须解析为恢复令牌,例如以下示例中的的 resumeToken

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

您可以使用 resumeAfter() 方法在恢复令牌中指定的操作后恢复通知。resumeAfter() 方法采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

您可以使用 ChangeStreamFlow.resumeAfter()方法,以便在执行恢复令牌中指定的操作后恢复通知。 resumeAfter()方法采用的值必须解析为恢复令牌,示例下例中的resumeToken变量。

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 $resumeToken

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

您可在打开游标时将恢复令牌传递给 startAfter,从而在特定事件之后启动新的变更流。与 resumeAfter 不同,startAfter 可在出现无效事件之后通过创建新的变更流来恢复通知。

请参阅恢复令牌以了解有关恢复令牌的更多信息。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

恢复令牌可从多个来源获取:

说明

更改事件通知包含针对 _id 字段的恢复词元:

$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌。

该字段仅在使用 aggregate 命令时显示。

getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

提示

MongoDB 提供了“代码段”,这是 mongosh 的扩展,用于解码十六进制编码的恢复令牌。

您可以从 mongosh 安装并运行 resumetoken

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

如果系统上安装了 npm,那么您还可以在命令行中运行 resumetoken (并且不使用 mongosh):

npx mongodb-resumetoken-decoder <RESUME TOKEN>

请参阅以下内容了解详细信息:

更改事件通知包含针对 _id 字段的恢复令牌:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

使用 aggregate 命令时,$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 命令还在 cursor.postBatchResumeToken 字段中包含一个恢复令牌:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。

对于在自管理部署上执行身份验证授权的部署:

  • 要打开针对特定集合的变更流,应用程序必须具有对相应集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。

例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。

如果某个操作与事务相关联,则变更事件文档包括 txnNumberlsid

除非提供了显式排序规则,否则变更流使用 simple 二进制比较。

从 MongoDB 5.3 开始,在范围迁移期间,不会为孤立文档的更新生成变更流事件。

从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):

如果图像属于以下情况,则前像和后像不可用于变更流事件

  • 在文档更新或删除操作时未对集合启用。

  • expireAfterSeconds 中设置的前像和后像保留时间后之后被删除。

    • 以下示例将整个集群上的 expireAfterSeconds 设置为 100 秒:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • 以下示例返回当前的 changeStreamOptions 设置,包括 expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSeconds 设置为 off 可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。

    • 如果变更流事件从 oplog 中删除,则无论 expireAfterSeconds 前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。

其他考量:

  • 启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。

  • 将变更流事件大小限制为小于 16 MB。要限制事件大小,您可以:

    • 将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如 updateDescription)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。

    • 如果其他变更流事件字段(例如 updateDescription)并不大,则仅请求变更流输出中最多 16 MB 的文档的后像。

    • 在以下情况下,仅请求最大 16 MB 的文档的 change stream 输出中的预映像:

      • 文档更新仅影响文档结构或内容的一小部分,

      • 不会引起 replace 变更事件。replace 事件始终包含后像。

  • 要请求前图像,请在db.collection.watch()中将fullDocumentBeforeChange设置为requiredwhenAvailable。要请求后图像,您可以使用相同的方法设置fullDocument

  • 前像被写入 config.system.preimages 集合。

    • config.system.preimages 集合可能会变大。要限制集合大小,可如前文所示为前映像设置 expireAfterSeconds 时间。

    • 前像由后台进程异步删除。

重要

向后不兼容的功能

从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod 命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。

提示

另请参阅:

有关变更流输出的完整示例,请参阅使用文档前像和后像的变更流