Docs 菜单
Docs 主页
/
MongoDB Manual
/ /

执行长期运行的快照查询

在此页面上

  • 比较本地读关注和快照读关注
  • 示例
  • 从同一时间点运行相关查询
  • 从过去某个时间点的数据的一致状态读取
  • 配置快照保留
  • 磁盘空间和历史记录

快照查询允许您读取最近某个时间点出现的数据。

从MongoDB 5.0开始,您可以使用读关注(read concern) "snapshot"来查询从从节点(secondary node from replica set)上的数据。 此功能提高了应用程序读取的多功能性和韧性。 您无需创建数据的静态副本,将其移至单独的系统中,也无需手动隔离这些长时间运行的查询,以免干扰操作工作负载。 相反,您可以对实时事务性数据库执行长时间运行的查询,同时读取一致状态的数据。

在从节点(secondary node from replica set)节点上使用读关注(read concern)"snapshot"不会影响应用程序的写入工作负载。 只有应用程序读取受益于隔离到从节点的长时间运行查询。

当您希望达到以下目标时,请使用快照查询:

  • 执行多个相关查询,并确保每个查询从同一时间点读取数据。

  • 确保您从过去某个点读取的数据处于一致状态。

当 MongoDB 使用默认的 "local" 读关注执行长时间运行的查询时,查询结果可能包含与查询同时发生的写入数据。因此,查询可能会返回意外或不一致的结果。

为避免这种情况,请创建一个会话并指定读关注(read concern)"snapshot" 。 使用读关注(read concern)"snapshot"时, MongoDB使用快照隔离性来运行查询,这意味着您的查询将读取最近单个时间点出现的数据。

本页上的示例展示了如何使用快照查询来:

  • 从同一时间点运行相关查询

  • 从过去某个时间点的数据的一致状态读取

读关注 "snapshot" 允许您在会话中运行多个相关查询,并确保每个查询从同一时间点读取数据。

动物收容所有一个pets数据库,其中包含每种宠物的集合。 pets数据库包含以下集合:

  • cats

  • dogs

每个集合中的每个文档都包含一个 adoptable 字段,指示该宠物是否可以被领养。例如,cats 集合中的文档如下所示:

{
"name": "Whiskers",
"color": "white",
"age": 10,
"adoptable": true
}

您想运行一个查询,查看所有集合中可供领养的宠物总数。为了提供一致的数据视图,您需要确保每个集合返回的数据来自同一时间点。

要实现此目标,请在会话中使用读关注(read concern)"snapshot"

mongoc_client_session_t *cs = NULL;
mongoc_collection_t *cats_collection = NULL;
mongoc_collection_t *dogs_collection = NULL;
int64_t adoptable_pets_count = 0;
bson_error_t error;
mongoc_session_opt_t *session_opts;
cats_collection = mongoc_client_get_collection (client, "pets", "cats");
dogs_collection = mongoc_client_get_collection (client, "pets", "dogs");
/* Seed 'pets.cats' and 'pets.dogs' with example data */
if (!pet_setup (cats_collection, dogs_collection)) {
goto cleanup;
}
/* start a snapshot session */
session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_snapshot (session_opts, true);
cs = mongoc_client_start_session (client, session_opts, &error);
mongoc_session_opts_destroy (session_opts);
if (!cs) {
MONGOC_ERROR ("Could not start session: %s", error.message);
goto cleanup;
}
/*
* Perform the following aggregation pipeline, and accumulate the count in
* `adoptable_pets_count`.
*
* adoptablePetsCount = db.cats.aggregate(
* [ { "$match": { "adoptable": true } },
* { "$count": "adoptableCatsCount" } ], session=s
* ).next()["adoptableCatsCount"]
*
* adoptablePetsCount += db.dogs.aggregate(
* [ { "$match": { "adoptable": True} },
* { "$count": "adoptableDogsCount" } ], session=s
* ).next()["adoptableDogsCount"]
*
* Remember in order to apply the client session to
* this operation, you must append the client session to the options passed
* to `mongoc_collection_aggregate`, i.e.,
*
* mongoc_client_session_append (cs, &opts, &error);
* cursor = mongoc_collection_aggregate (
* collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL);
*/
accumulate_adoptable_count (cs, cats_collection, &adoptable_pets_count);
accumulate_adoptable_count (cs, dogs_collection, &adoptable_pets_count);
printf ("there are %" PRId64 " adoptable pets\n", adoptable_pets_count);
using namespace mongocxx;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_document;
auto db = client["pets"];
int64_t adoptable_pets_count = 0;
auto opts = mongocxx::options::client_session{};
opts.snapshot(true);
auto session = client.start_session(opts);
{
pipeline p;
p.match(make_document(kvp("adoptable", true))).count("adoptableCatsCount");
auto cursor = db["cats"].aggregate(session, p);
for (auto doc : cursor) {
adoptable_pets_count += doc.find("adoptableCatsCount")->get_int32();
}
}
{
pipeline p;
p.match(make_document(kvp("adoptable", true))).count("adoptableDogsCount");
auto cursor = db["dogs"].aggregate(session, p);
for (auto doc : cursor) {
adoptable_pets_count += doc.find("adoptableDogsCount")->get_int32();
}
}
ctx := context.TODO()
sess, err := client.StartSession(options.Session().SetSnapshot(true))
if err != nil {
return err
}
defer sess.EndSession(ctx)
var adoptablePetsCount int32
err = mongo.WithSession(ctx, sess, func(ctx context.Context) error {
// Count the adoptable cats
const adoptableCatsOutput = "adoptableCatsCount"
cursor, err := db.Collection("cats").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match", bson.D{{"adoptable", true}}}},
bson.D{{"$count", adoptableCatsOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp := cursor.Current.Lookup(adoptableCatsOutput)
adoptableCatsCount, ok := resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", adoptableCatsOutput, cursor.Current)
}
adoptablePetsCount += adoptableCatsCount
// Count the adoptable dogs
const adoptableDogsOutput = "adoptableDogsCount"
cursor, err = db.Collection("dogs").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match", bson.D{{"adoptable", true}}}},
bson.D{{"$count", adoptableDogsOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp = cursor.Current.Lookup(adoptableDogsOutput)
adoptableDogsCount, ok := resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", adoptableDogsOutput, cursor.Current)
}
adoptablePetsCount += adoptableDogsCount
return nil
})
if err != nil {
return err
}
db = client.pets
async with await client.start_session(snapshot=True) as s:
adoptablePetsCount = 0
docs = await db.cats.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s
).to_list(None)
adoptablePetsCount = docs[0]["adoptableCatsCount"]
docs = await db.dogs.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s
).to_list(None)
adoptablePetsCount += docs[0]["adoptableDogsCount"]
print(adoptablePetsCount)
$catsCollection = $client->selectCollection('pets', 'cats');
$dogsCollection = $client->selectCollection('pets', 'dogs');
$session = $client->startSession(['snapshot' => true]);
$adoptablePetsCount = $catsCollection->aggregate(
[
['$match' => ['adoptable' => true]],
['$count' => 'adoptableCatsCount'],
],
['session' => $session],
)->toArray()[0]->adoptableCatsCount;
$adoptablePetsCount += $dogsCollection->aggregate(
[
['$match' => ['adoptable' => true]],
['$count' => 'adoptableDogsCount'],
],
['session' => $session],
)->toArray()[0]->adoptableDogsCount;
var_dump($adoptablePetsCount);
db = client.pets
with client.start_session(snapshot=True) as s:
adoptablePetsCount = db.cats.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s
).next()["adoptableCatsCount"]
adoptablePetsCount += db.dogs.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s
).next()["adoptableDogsCount"]
print(adoptablePetsCount)
client = Mongo::Client.new(uri_string, database: "pets")
client.start_session(snapshot: true) do |session|
adoptable_pets_count = client['cats'].aggregate([
{ "$match": { "adoptable": true } },
{ "$count": "adoptable_cats_count" }
], session: session).first["adoptable_cats_count"]
adoptable_pets_count += client['dogs'].aggregate([
{ "$match": { "adoptable": true } },
{ "$count": "adoptable_dogs_count" }
], session: session).first["adoptable_dogs_count"]
puts adoptable_pets_count
end

前面的一系列命令:

  • 使用 MongoClient() 建立与 MongoDB 部署的连接。

  • 切换到 pets 数据库。

  • 建立会话。该命令指定 snapshot=True,因此会话使用读关注 "snapshot"

  • pets 数据库中的每个集合执行以下操作:

    • 使用 $match 过滤 adoptable 字段为 True 的文档。

    • 使用$count返回已筛选文档的计数。

    • 用数据库中的计数递增adoptablePetsCount变量。

  • 打印 adoptablePetsCount 变量。

会话中的所有查询都会读取同一时间点出现的数据。 因此,最终计数反映了数据的一致快照。

注意

如果会话持续时间超过WiredTiger历史记录保留期(默认为300秒),则查询出错并显示SnapshotTooOld错误。 要学习;了解如何配置快照保留和启用长时间运行的查询,请参阅配置快照保留。

读关注 "snapshot" 确保您的查询读取最近某个时间点出现的数据。

一家网上鞋店有一个 sales 集合,其中包含该店出售的每件商品的数据。例如,sales 集合中的文档如下所示:

{
"shoeType": "boot",
"price": 30,
"saleDate": ISODate("2022-02-02T06:01:17.171Z")
}

每天午夜,系统都会运行查询以查看当天售出了多少双鞋。 每日销售查询如下所示:

mongoc_client_session_t *cs = NULL;
mongoc_collection_t *sales_collection = NULL;
bson_error_t error;
mongoc_session_opt_t *session_opts;
bson_t *pipeline = NULL;
bson_t opts = BSON_INITIALIZER;
mongoc_cursor_t *cursor = NULL;
const bson_t *doc = NULL;
bool ok = true;
bson_iter_t iter;
int64_t total_sales = 0;
sales_collection = mongoc_client_get_collection (client, "retail", "sales");
/* seed 'retail.sales' with example data */
if (!retail_setup (sales_collection)) {
goto cleanup;
}
/* start a snapshot session */
session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_snapshot (session_opts, true);
cs = mongoc_client_start_session (client, session_opts, &error);
mongoc_session_opts_destroy (session_opts);
if (!cs) {
MONGOC_ERROR ("Could not start session: %s", error.message);
goto cleanup;
}
if (!mongoc_client_session_append (cs, &opts, &error)) {
MONGOC_ERROR ("could not apply session options: %s", error.message);
goto cleanup;
}
pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"$expr",
"{",
"$gt",
"[",
"$saleDate",
"{",
"$dateSubtract",
"{",
"startDate",
"$$NOW",
"unit",
BCON_UTF8 ("day"),
"amount",
BCON_INT64 (1),
"}",
"}",
"]",
"}",
"}",
"}",
"{",
"$count",
BCON_UTF8 ("totalDailySales"),
"}",
"]");
cursor = mongoc_collection_aggregate (sales_collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL);
bson_destroy (&opts);
ok = mongoc_cursor_next (cursor, &doc);
if (mongoc_cursor_error (cursor, &error)) {
MONGOC_ERROR ("could not get totalDailySales: %s", error.message);
goto cleanup;
}
if (!ok) {
MONGOC_ERROR ("%s", "cursor has no results");
goto cleanup;
}
ok = bson_iter_init_find (&iter, doc, "totalDailySales");
if (ok) {
total_sales = bson_iter_as_int64 (&iter);
} else {
MONGOC_ERROR ("%s", "missing key: 'totalDailySales'");
goto cleanup;
}
ctx := context.TODO()
sess, err := client.StartSession(options.Session().SetSnapshot(true))
if err != nil {
return err
}
defer sess.EndSession(ctx)
var totalDailySales int32
err = mongo.WithSession(ctx, sess, func(ctx context.Context) error {
// Count the total daily sales
const totalDailySalesOutput = "totalDailySales"
cursor, err := db.Collection("sales").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match",
bson.D{{"$expr",
bson.D{{"$gt",
bson.A{"$saleDate",
bson.D{{"$dateSubtract",
bson.D{
{"startDate", "$$NOW"},
{"unit", "day"},
{"amount", 1},
},
}},
},
}},
}},
}},
bson.D{{"$count", totalDailySalesOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp := cursor.Current.Lookup(totalDailySalesOutput)
var ok bool
totalDailySales, ok = resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", totalDailySalesOutput, cursor.Current)
}
return nil
})
if err != nil {
return err
}
db = client.retail
async with await client.start_session(snapshot=True) as s:
docs = await db.sales.aggregate(
[
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
"startDate": "$$NOW",
"unit": "day",
"amount": 1,
}
},
]
}
}
},
{"$count": "totalDailySales"},
],
session=s,
).to_list(None)
total = docs[0]["totalDailySales"]
print(total)
$salesCollection = $client->selectCollection('retail', 'sales');
$session = $client->startSession(['snapshot' => true]);
$totalDailySales = $salesCollection->aggregate(
[
[
'$match' => [
'$expr' => [
'$gt' => ['$saleDate', [
'$dateSubtract' => [
'startDate' => '$$NOW',
'unit' => 'day',
'amount' => 1,
],
],
],
],
],
],
['$count' => 'totalDailySales'],
],
['session' => $session],
)->toArray()[0]->totalDailySales;
db = client.retail
with client.start_session(snapshot=True) as s:
db.sales.aggregate(
[
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
"startDate": "$$NOW",
"unit": "day",
"amount": 1,
}
},
]
}
}
},
{"$count": "totalDailySales"},
],
session=s,
).next()["totalDailySales"]

前面的查询:

  • 使用$match$expr指定针对saleDate字段的过滤。

  • 使用 $gt 操作符和 $dateSubtract 表达式返回 saleDate 于执行查询前一天的文档。

  • 使用$count返回匹配文档的计数。 计数存储在totalDailySales变量中。

  • 指定读关注(read concern)"snapshot"以确保查询从单个时间点读取。

sales 集合非常大,因此此查询可能需要几分钟才能完成运行。由于商店是在线的,一天中的任何时间都可以进行销售。

例如,考虑如果:

  • 查询于12 : 00 AM 开始执行。

  • 某客户于12 : 02 AM 购买了三双鞋。

  • 查询于12 : 04 AM 结束执行。

如果查询不使用读关注 "snapshot",则在查询开始和查询结束之间发生的销售额可以计入查询次数,尽管这些销售额不是在报告当天发生的。这可能导致报告不准确,一些销售额会被重复计算。

通过指定读关注(read concern)"snapshot" ,查询仅返回在查询开始执行前不久数据库中存在的数据。

注意

如果查询花费的时间超过WiredTiger历史记录保留期(默认为300秒),则查询出错并显示SnapshotTooOld错误。 要学习;了解如何配置快照保留和启用长时间运行的查询,请参阅配置快照保留。

默认情况下, WiredTiger storage engine将历史记录保留 300 秒。 从会话中第一个操作到最后一个操作,您可以使用带有snapshot=true的会话总共300秒。 如果使用该会话的时间较长,则会话将失败并显示SnapshotTooOld错误。 同样,如果使用读关注"snapshot"查询数据,并且查询持续时间超过300秒,则查询将失败。

如果查询或会话运行时间超过300秒,请考虑增加快照保留期。 要延长保留期,请修改minSnapshotHistoryWindowInSeconds参数。

示例,此命令将minSnapshotHistoryWindowInSeconds的值设置为600秒:

db.adminCommand( { setParameter: 1, minSnapshotHistoryWindowInSeconds: 600 } )

重要

增大 minSnapshotHistoryWindowInSeconds 的值会增加磁盘使用量,因为服务器必须在指定的时间窗口内维护早期已修改值的历史记录。使用的磁盘空间量取决于工作负载,工作负载量越大,需要的磁盘空间越多。

后退

超时