执行长期运行的快照查询
快照查询允许您读取最近某个时间点出现的数据。
从MongoDB 5.0开始,您可以使用读关注(read concern) "snapshot"
来查询从从节点(secondary node from replica set)上的数据。 此功能提高了应用程序读取的多功能性和韧性。 您无需创建数据的静态副本,将其移至单独的系统中,也无需手动隔离这些长时间运行的查询,以免干扰操作工作负载。 相反,您可以对实时事务性数据库执行长时间运行的查询,同时读取一致状态的数据。
在从节点(secondary node from replica set)节点上使用读关注(read concern)"snapshot"
不会影响应用程序的写入工作负载。 只有应用程序读取受益于隔离到从节点的长时间运行查询。
当您希望达到以下目标时,请使用快照查询:
执行多个相关查询,并确保每个查询从同一时间点读取数据。
确保您从过去某个点读取的数据处于一致状态。
比较本地读关注和快照读关注
当 MongoDB 使用默认的 "local"
读关注执行长时间运行的查询时,查询结果可能包含与查询同时发生的写入数据。因此,查询可能会返回意外或不一致的结果。
为避免这种情况,请创建一个会话并指定读关注(read concern)"snapshot"
。 使用读关注(read concern)"snapshot"
时, MongoDB使用快照隔离性来运行查询,这意味着您的查询将读取最近单个时间点出现的数据。
示例
本页上的示例展示了如何使用快照查询来:
从同一时间点运行相关查询
读关注 "snapshot"
允许您在会话中运行多个相关查询,并确保每个查询从同一时间点读取数据。
动物收容所有一个pets
数据库,其中包含每种宠物的集合。 pets
数据库包含以下集合:
cats
dogs
每个集合中的每个文档都包含一个 adoptable
字段,指示该宠物是否可以被领养。例如,cats
集合中的文档如下所示:
{ "name": "Whiskers", "color": "white", "age": 10, "adoptable": true }
您想运行一个查询,查看所有集合中可供领养的宠物总数。为了提供一致的数据视图,您需要确保每个集合返回的数据来自同一时间点。
要实现此目标,请在会话中使用读关注(read concern)"snapshot"
:
mongoc_client_session_t *cs = NULL; mongoc_collection_t *cats_collection = NULL; mongoc_collection_t *dogs_collection = NULL; int64_t adoptable_pets_count = 0; bson_error_t error; mongoc_session_opt_t *session_opts; cats_collection = mongoc_client_get_collection (client, "pets", "cats"); dogs_collection = mongoc_client_get_collection (client, "pets", "dogs"); /* Seed 'pets.cats' and 'pets.dogs' with example data */ if (!pet_setup (cats_collection, dogs_collection)) { goto cleanup; } /* start a snapshot session */ session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_snapshot (session_opts, true); cs = mongoc_client_start_session (client, session_opts, &error); mongoc_session_opts_destroy (session_opts); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); goto cleanup; } /* * Perform the following aggregation pipeline, and accumulate the count in * `adoptable_pets_count`. * * adoptablePetsCount = db.cats.aggregate( * [ { "$match": { "adoptable": true } }, * { "$count": "adoptableCatsCount" } ], session=s * ).next()["adoptableCatsCount"] * * adoptablePetsCount += db.dogs.aggregate( * [ { "$match": { "adoptable": True} }, * { "$count": "adoptableDogsCount" } ], session=s * ).next()["adoptableDogsCount"] * * Remember in order to apply the client session to * this operation, you must append the client session to the options passed * to `mongoc_collection_aggregate`, i.e., * * mongoc_client_session_append (cs, &opts, &error); * cursor = mongoc_collection_aggregate ( * collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL); */ accumulate_adoptable_count (cs, cats_collection, &adoptable_pets_count); accumulate_adoptable_count (cs, dogs_collection, &adoptable_pets_count); printf ("there are %" PRId64 " adoptable pets\n", adoptable_pets_count);
using namespace mongocxx; using bsoncxx::builder::basic::kvp; using bsoncxx::builder::basic::make_document; auto db = client["pets"]; int64_t adoptable_pets_count = 0; auto opts = mongocxx::options::client_session{}; opts.snapshot(true); auto session = client.start_session(opts); { pipeline p; p.match(make_document(kvp("adoptable", true))).count("adoptableCatsCount"); auto cursor = db["cats"].aggregate(session, p); for (auto doc : cursor) { adoptable_pets_count += doc.find("adoptableCatsCount")->get_int32(); } } { pipeline p; p.match(make_document(kvp("adoptable", true))).count("adoptableDogsCount"); auto cursor = db["dogs"].aggregate(session, p); for (auto doc : cursor) { adoptable_pets_count += doc.find("adoptableDogsCount")->get_int32(); } }
ctx := context.TODO() sess, err := client.StartSession(options.Session().SetSnapshot(true)) if err != nil { return err } defer sess.EndSession(ctx) var adoptablePetsCount int32 err = mongo.WithSession(ctx, sess, func(ctx context.Context) error { // Count the adoptable cats const adoptableCatsOutput = "adoptableCatsCount" cursor, err := db.Collection("cats").Aggregate(ctx, mongo.Pipeline{ bson.D{{"$match", bson.D{{"adoptable", true}}}}, bson.D{{"$count", adoptableCatsOutput}}, }) if err != nil { return err } if !cursor.Next(ctx) { return fmt.Errorf("expected aggregate to return a document, but got none") } resp := cursor.Current.Lookup(adoptableCatsOutput) adoptableCatsCount, ok := resp.Int32OK() if !ok { return fmt.Errorf("failed to find int32 field %q in document %v", adoptableCatsOutput, cursor.Current) } adoptablePetsCount += adoptableCatsCount // Count the adoptable dogs const adoptableDogsOutput = "adoptableDogsCount" cursor, err = db.Collection("dogs").Aggregate(ctx, mongo.Pipeline{ bson.D{{"$match", bson.D{{"adoptable", true}}}}, bson.D{{"$count", adoptableDogsOutput}}, }) if err != nil { return err } if !cursor.Next(ctx) { return fmt.Errorf("expected aggregate to return a document, but got none") } resp = cursor.Current.Lookup(adoptableDogsOutput) adoptableDogsCount, ok := resp.Int32OK() if !ok { return fmt.Errorf("failed to find int32 field %q in document %v", adoptableDogsOutput, cursor.Current) } adoptablePetsCount += adoptableDogsCount return nil }) if err != nil { return err }
db = client.pets async with await client.start_session(snapshot=True) as s: adoptablePetsCount = 0 docs = await db.cats.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s ).to_list(None) adoptablePetsCount = docs[0]["adoptableCatsCount"] docs = await db.dogs.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s ).to_list(None) adoptablePetsCount += docs[0]["adoptableDogsCount"] print(adoptablePetsCount)
$catsCollection = $client->selectCollection('pets', 'cats'); $dogsCollection = $client->selectCollection('pets', 'dogs'); $session = $client->startSession(['snapshot' => true]); $adoptablePetsCount = $catsCollection->aggregate( [ ['$match' => ['adoptable' => true]], ['$count' => 'adoptableCatsCount'], ], ['session' => $session], )->toArray()[0]->adoptableCatsCount; $adoptablePetsCount += $dogsCollection->aggregate( [ ['$match' => ['adoptable' => true]], ['$count' => 'adoptableDogsCount'], ], ['session' => $session], )->toArray()[0]->adoptableDogsCount; var_dump($adoptablePetsCount);
db = client.pets with client.start_session(snapshot=True) as s: adoptablePetsCount = db.cats.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s ).next()["adoptableCatsCount"] adoptablePetsCount += db.dogs.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s ).next()["adoptableDogsCount"] print(adoptablePetsCount)
client = Mongo::Client.new(uri_string, database: "pets") client.start_session(snapshot: true) do |session| adoptable_pets_count = client['cats'].aggregate([ { "$match": { "adoptable": true } }, { "$count": "adoptable_cats_count" } ], session: session).first["adoptable_cats_count"] adoptable_pets_count += client['dogs'].aggregate([ { "$match": { "adoptable": true } }, { "$count": "adoptable_dogs_count" } ], session: session).first["adoptable_dogs_count"] puts adoptable_pets_count end
前面的一系列命令:
使用
MongoClient()
建立与 MongoDB 部署的连接。切换到
pets
数据库。建立会话。该命令指定
snapshot=True
,因此会话使用读关注"snapshot"
。对
pets
数据库中的每个集合执行以下操作:打印
adoptablePetsCount
变量。
会话中的所有查询都会读取同一时间点出现的数据。 因此,最终计数反映了数据的一致快照。
注意
如果会话持续时间超过WiredTiger历史记录保留期(默认为300秒),则查询出错并显示SnapshotTooOld
错误。 要学习;了解如何配置快照保留和启用长时间运行的查询,请参阅配置快照保留。
从过去某个时间点的数据的一致状态读取
读关注 "snapshot"
确保您的查询读取最近某个时间点出现的数据。
一家网上鞋店有一个 sales
集合,其中包含该店出售的每件商品的数据。例如,sales
集合中的文档如下所示:
{ "shoeType": "boot", "price": 30, "saleDate": ISODate("2022-02-02T06:01:17.171Z") }
每天午夜,系统都会运行查询以查看当天售出了多少双鞋。 每日销售查询如下所示:
mongoc_client_session_t *cs = NULL; mongoc_collection_t *sales_collection = NULL; bson_error_t error; mongoc_session_opt_t *session_opts; bson_t *pipeline = NULL; bson_t opts = BSON_INITIALIZER; mongoc_cursor_t *cursor = NULL; const bson_t *doc = NULL; bool ok = true; bson_iter_t iter; int64_t total_sales = 0; sales_collection = mongoc_client_get_collection (client, "retail", "sales"); /* seed 'retail.sales' with example data */ if (!retail_setup (sales_collection)) { goto cleanup; } /* start a snapshot session */ session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_snapshot (session_opts, true); cs = mongoc_client_start_session (client, session_opts, &error); mongoc_session_opts_destroy (session_opts); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); goto cleanup; } if (!mongoc_client_session_append (cs, &opts, &error)) { MONGOC_ERROR ("could not apply session options: %s", error.message); goto cleanup; } pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "$expr", "{", "$gt", "[", "$saleDate", "{", "$dateSubtract", "{", "startDate", "$$NOW", "unit", BCON_UTF8 ("day"), "amount", BCON_INT64 (1), "}", "}", "]", "}", "}", "}", "{", "$count", BCON_UTF8 ("totalDailySales"), "}", "]"); cursor = mongoc_collection_aggregate (sales_collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL); bson_destroy (&opts); ok = mongoc_cursor_next (cursor, &doc); if (mongoc_cursor_error (cursor, &error)) { MONGOC_ERROR ("could not get totalDailySales: %s", error.message); goto cleanup; } if (!ok) { MONGOC_ERROR ("%s", "cursor has no results"); goto cleanup; } ok = bson_iter_init_find (&iter, doc, "totalDailySales"); if (ok) { total_sales = bson_iter_as_int64 (&iter); } else { MONGOC_ERROR ("%s", "missing key: 'totalDailySales'"); goto cleanup; }
ctx := context.TODO() sess, err := client.StartSession(options.Session().SetSnapshot(true)) if err != nil { return err } defer sess.EndSession(ctx) var totalDailySales int32 err = mongo.WithSession(ctx, sess, func(ctx context.Context) error { // Count the total daily sales const totalDailySalesOutput = "totalDailySales" cursor, err := db.Collection("sales").Aggregate(ctx, mongo.Pipeline{ bson.D{{"$match", bson.D{{"$expr", bson.D{{"$gt", bson.A{"$saleDate", bson.D{{"$dateSubtract", bson.D{ {"startDate", "$$NOW"}, {"unit", "day"}, {"amount", 1}, }, }}, }, }}, }}, }}, bson.D{{"$count", totalDailySalesOutput}}, }) if err != nil { return err } if !cursor.Next(ctx) { return fmt.Errorf("expected aggregate to return a document, but got none") } resp := cursor.Current.Lookup(totalDailySalesOutput) var ok bool totalDailySales, ok = resp.Int32OK() if !ok { return fmt.Errorf("failed to find int32 field %q in document %v", totalDailySalesOutput, cursor.Current) } return nil }) if err != nil { return err }
db = client.retail async with await client.start_session(snapshot=True) as s: docs = await db.sales.aggregate( [ { "$match": { "$expr": { "$gt": [ "$saleDate", { "$dateSubtract": { "startDate": "$$NOW", "unit": "day", "amount": 1, } }, ] } } }, {"$count": "totalDailySales"}, ], session=s, ).to_list(None) total = docs[0]["totalDailySales"] print(total)
$salesCollection = $client->selectCollection('retail', 'sales'); $session = $client->startSession(['snapshot' => true]); $totalDailySales = $salesCollection->aggregate( [ [ '$match' => [ '$expr' => [ '$gt' => ['$saleDate', [ '$dateSubtract' => [ 'startDate' => '$$NOW', 'unit' => 'day', 'amount' => 1, ], ], ], ], ], ], ['$count' => 'totalDailySales'], ], ['session' => $session], )->toArray()[0]->totalDailySales;
db = client.retail with client.start_session(snapshot=True) as s: db.sales.aggregate( [ { "$match": { "$expr": { "$gt": [ "$saleDate", { "$dateSubtract": { "startDate": "$$NOW", "unit": "day", "amount": 1, } }, ] } } }, {"$count": "totalDailySales"}, ], session=s, ).next()["totalDailySales"]
前面的查询:
使用
$gt
操作符和$dateSubtract
表达式返回saleDate
于执行查询前一天的文档。使用
$count
返回匹配文档的计数。 计数存储在totalDailySales
变量中。指定读关注(read concern)
"snapshot"
以确保查询从单个时间点读取。
sales
集合非常大,因此此查询可能需要几分钟才能完成运行。由于商店是在线的,一天中的任何时间都可以进行销售。
例如,考虑如果:
查询于12 : 00 AM 开始执行。
某客户于12 : 02 AM 购买了三双鞋。
查询于12 : 04 AM 结束执行。
如果查询不使用读关注 "snapshot"
,则在查询开始和查询结束之间发生的销售额可以计入查询次数,尽管这些销售额不是在报告当天发生的。这可能导致报告不准确,一些销售额会被重复计算。
通过指定读关注(read concern)"snapshot"
,查询仅返回在查询开始执行前不久数据库中存在的数据。
注意
如果查询花费的时间超过WiredTiger历史记录保留期(默认为300秒),则查询出错并显示SnapshotTooOld
错误。 要学习;了解如何配置快照保留和启用长时间运行的查询,请参阅配置快照保留。
配置快照保留
默认情况下, WiredTiger storage engine将历史记录保留 300 秒。 从会话中第一个操作到最后一个操作,您可以使用带有snapshot=true
的会话总共300秒。 如果使用该会话的时间较长,则会话将失败并显示SnapshotTooOld
错误。 同样,如果使用读关注"snapshot"
查询数据,并且查询持续时间超过300秒,则查询将失败。
如果查询或会话运行时间超过300秒,请考虑增加快照保留期。 要延长保留期,请修改minSnapshotHistoryWindowInSeconds
参数。
示例,此命令将minSnapshotHistoryWindowInSeconds
的值设置为600秒:
db.adminCommand( { setParameter: 1, minSnapshotHistoryWindowInSeconds: 600 } )
重要
磁盘空间和历史记录
增大 minSnapshotHistoryWindowInSeconds
的值会增加磁盘使用量,因为服务器必须在指定的时间窗口内维护早期已修改值的历史记录。使用的磁盘空间量取决于工作负载,工作负载量越大,需要的磁盘空间越多。