長時間実行スナップショット クエリの実行
スナップショット クエリを使用すると、直近の特定の点に表示されるデータを読み取ることができます。
MongoDB 5.0以降では、読み取り保証 "snapshot"
を使用してセカンダリノード上のデータをクエリできます。 この機能により、アプリケーションの読み取りの柔軟性と回復力が高まります。 データの静的コピーを作成し、別のシステムに移動し、これらの長時間実行クエリを運用ワークロードの中断から手動で分離する必要はありません。 代わりに、実行中のトランザクション データベースに対して長時間実行クエリを実行しながら、一貫した状態のデータから読み取りを行うことができます。
セカンダリ ノードで読み取り保証"snapshot"
を使用しても、アプリケーションの書込みワークロードには影響しません。 アプリケーション読み取りのみ、実行時間が長いクエリをセカンダリに分離することでメリットが得られます。
以下の場合は、スナップショット クエリを使用します。
関連する複数のクエリを実行し、各クエリが同じ時点からデータを読み込むようにします。
過去のある点のデータが一貫した状態で読み取られていることを確認してください。
ローカル読み取り保証とスナップショット読み取り保証の比較
MongoDB がデフォルトの"local"
読み取り保証 (read concern) を使用して長時間実行クエリを実行する場合、クエリ結果にはクエリと同時に発生した書込み (write) によるデータが含まれる場合があります。 その結果、クエリは予期しない結果や一貫性のない結果を返すことがあります。
このシナリオを回避するには、セッションを作成し、読み取り保証(read concern "snapshot"
を指定します。 読み取り保証 (read concern) "snapshot"
を使用すると、MongoDB はスナップショット分離でクエリを実行します。つまり、クエリは直近の特定の点に表示されたデータを読み取ります。
例
このページの例は、スナップショット クエリを使用して次の方法を実行する方法を示しています。
同じ時点から関連クエリを実行
読み取り保証"snapshot"
を使用すると、セッション内で複数の関連クエリを実行し、各クエリが同じ時点からデータを読み取ることができます。
犬の管理では、各タイプのコレクションを含むpets
データベースがあります。 pets
データベースには以下のコレクションがあります。
cats
dogs
各コレクション内の各ドキュメントには、犬が保護可能かどうかを示すadoptable
フィールドが含まれています。 たとえば、 cats
コレクション内のドキュメントは次のようになります。
{ "name": "Whiskers", "color": "white", "age": 10, "adoptable": true }
クエリを実行して、すべてのコレクションにわたる受け入れる可能性のある犬の合計数を確認したいとします。 データの一貫したビューを提供するには、各コレクションから返されるデータが単一の時点のものであることを確認する必要があります。
この目的を実現するには、セッション内で読み取り保証(read concern) "snapshot"
を使用します。
mongoc_client_session_t *cs = NULL; mongoc_collection_t *cats_collection = NULL; mongoc_collection_t *dogs_collection = NULL; int64_t adoptable_pets_count = 0; bson_error_t error; mongoc_session_opt_t *session_opts; cats_collection = mongoc_client_get_collection (client, "pets", "cats"); dogs_collection = mongoc_client_get_collection (client, "pets", "dogs"); /* Seed 'pets.cats' and 'pets.dogs' with example data */ if (!pet_setup (cats_collection, dogs_collection)) { goto cleanup; } /* start a snapshot session */ session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_snapshot (session_opts, true); cs = mongoc_client_start_session (client, session_opts, &error); mongoc_session_opts_destroy (session_opts); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); goto cleanup; } /* * Perform the following aggregation pipeline, and accumulate the count in * `adoptable_pets_count`. * * adoptablePetsCount = db.cats.aggregate( * [ { "$match": { "adoptable": true } }, * { "$count": "adoptableCatsCount" } ], session=s * ).next()["adoptableCatsCount"] * * adoptablePetsCount += db.dogs.aggregate( * [ { "$match": { "adoptable": True} }, * { "$count": "adoptableDogsCount" } ], session=s * ).next()["adoptableDogsCount"] * * Remember in order to apply the client session to * this operation, you must append the client session to the options passed * to `mongoc_collection_aggregate`, i.e., * * mongoc_client_session_append (cs, &opts, &error); * cursor = mongoc_collection_aggregate ( * collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL); */ accumulate_adoptable_count (cs, cats_collection, &adoptable_pets_count); accumulate_adoptable_count (cs, dogs_collection, &adoptable_pets_count); printf ("there are %" PRId64 " adoptable pets\n", adoptable_pets_count);
using namespace mongocxx; using bsoncxx::builder::basic::kvp; using bsoncxx::builder::basic::make_document; auto db = client["pets"]; int64_t adoptable_pets_count = 0; auto opts = mongocxx::options::client_session{}; opts.snapshot(true); auto session = client.start_session(opts); { pipeline p; p.match(make_document(kvp("adoptable", true))).count("adoptableCatsCount"); auto cursor = db["cats"].aggregate(session, p); for (auto doc : cursor) { adoptable_pets_count += doc.find("adoptableCatsCount")->get_int32(); } } { pipeline p; p.match(make_document(kvp("adoptable", true))).count("adoptableDogsCount"); auto cursor = db["dogs"].aggregate(session, p); for (auto doc : cursor) { adoptable_pets_count += doc.find("adoptableDogsCount")->get_int32(); } }
ctx := context.TODO() sess, err := client.StartSession(options.Session().SetSnapshot(true)) if err != nil { return err } defer sess.EndSession(ctx) var adoptablePetsCount int32 err = mongo.WithSession(ctx, sess, func(ctx context.Context) error { // Count the adoptable cats const adoptableCatsOutput = "adoptableCatsCount" cursor, err := db.Collection("cats").Aggregate(ctx, mongo.Pipeline{ bson.D{{"$match", bson.D{{"adoptable", true}}}}, bson.D{{"$count", adoptableCatsOutput}}, }) if err != nil { return err } if !cursor.Next(ctx) { return fmt.Errorf("expected aggregate to return a document, but got none") } resp := cursor.Current.Lookup(adoptableCatsOutput) adoptableCatsCount, ok := resp.Int32OK() if !ok { return fmt.Errorf("failed to find int32 field %q in document %v", adoptableCatsOutput, cursor.Current) } adoptablePetsCount += adoptableCatsCount // Count the adoptable dogs const adoptableDogsOutput = "adoptableDogsCount" cursor, err = db.Collection("dogs").Aggregate(ctx, mongo.Pipeline{ bson.D{{"$match", bson.D{{"adoptable", true}}}}, bson.D{{"$count", adoptableDogsOutput}}, }) if err != nil { return err } if !cursor.Next(ctx) { return fmt.Errorf("expected aggregate to return a document, but got none") } resp = cursor.Current.Lookup(adoptableDogsOutput) adoptableDogsCount, ok := resp.Int32OK() if !ok { return fmt.Errorf("failed to find int32 field %q in document %v", adoptableDogsOutput, cursor.Current) } adoptablePetsCount += adoptableDogsCount return nil }) if err != nil { return err }
db = client.pets async with await client.start_session(snapshot=True) as s: adoptablePetsCount = 0 docs = await db.cats.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s ).to_list(None) adoptablePetsCount = docs[0]["adoptableCatsCount"] docs = await db.dogs.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s ).to_list(None) adoptablePetsCount += docs[0]["adoptableDogsCount"] print(adoptablePetsCount)
$catsCollection = $client->selectCollection('pets', 'cats'); $dogsCollection = $client->selectCollection('pets', 'dogs'); $session = $client->startSession(['snapshot' => true]); $adoptablePetsCount = $catsCollection->aggregate( [ ['$match' => ['adoptable' => true]], ['$count' => 'adoptableCatsCount'], ], ['session' => $session], )->toArray()[0]->adoptableCatsCount; $adoptablePetsCount += $dogsCollection->aggregate( [ ['$match' => ['adoptable' => true]], ['$count' => 'adoptableDogsCount'], ], ['session' => $session], )->toArray()[0]->adoptableDogsCount; var_dump($adoptablePetsCount);
db = client.pets with client.start_session(snapshot=True) as s: adoptablePetsCount = db.cats.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s ).next()["adoptableCatsCount"] adoptablePetsCount += db.dogs.aggregate( [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s ).next()["adoptableDogsCount"] print(adoptablePetsCount)
client = Mongo::Client.new(uri_string, database: "pets") client.start_session(snapshot: true) do |session| adoptable_pets_count = client['cats'].aggregate([ { "$match": { "adoptable": true } }, { "$count": "adoptable_cats_count" } ], session: session).first["adoptable_cats_count"] adoptable_pets_count += client['dogs'].aggregate([ { "$match": { "adoptable": true } }, { "$count": "adoptable_dogs_count" } ], session: session).first["adoptable_dogs_count"] puts adoptable_pets_count end
上記の一連のコマンドでは、次のようになります。
MongoDB 配置への接続を確立するには、
MongoClient()
を使用します。pets
データベースに切り替えます。セッションを確立します。 コマンドは
snapshot=True
を指定しているため、セッションは読み取り保証"snapshot"
を使用します。pets
データベース内の各コレクションに対してこれらのアクションを実行します。adoptablePetsCount
変数を出力します。
セッション内のすべてのクエリは、同じ時点に表示されたデータを読み取ります。 その結果、最終カウントには、データの一貫したスナップショットが反映されます。
注意
セッションが WiredTiger 履歴保持期間(デフォルトでは300秒)を超えて続く場合、クエリはSnapshotTooOld
エラーでエラーします。 スナップショット保持を構成し、長時間実行クエリを有効にする方法については、「スナップショット保持の構成 」を参照してください。
過去のある点のデータの一貫した状態からの読み取り
読み取り保証"snapshot"
を使用すると、クエリが直近の特定の点に表示されるデータが読み取られるようになります。
オンライン ショップには、店舗で販売されている各アイテムのデータが含まれるsales
コレクションがあります。 たとえば、 sales
コレクション内のドキュメントは次のようになります。
{ "shoeType": "boot", "price": 30, "saleDate": ISODate("2022-02-02T06:01:17.171Z") }
毎日の午前 0 時、クエリが実行され、その日に販売された服の数を確認できます。 日次売上クエリは次のようになります。
mongoc_client_session_t *cs = NULL; mongoc_collection_t *sales_collection = NULL; bson_error_t error; mongoc_session_opt_t *session_opts; bson_t *pipeline = NULL; bson_t opts = BSON_INITIALIZER; mongoc_cursor_t *cursor = NULL; const bson_t *doc = NULL; bool ok = true; bson_iter_t iter; int64_t total_sales = 0; sales_collection = mongoc_client_get_collection (client, "retail", "sales"); /* seed 'retail.sales' with example data */ if (!retail_setup (sales_collection)) { goto cleanup; } /* start a snapshot session */ session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_snapshot (session_opts, true); cs = mongoc_client_start_session (client, session_opts, &error); mongoc_session_opts_destroy (session_opts); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); goto cleanup; } if (!mongoc_client_session_append (cs, &opts, &error)) { MONGOC_ERROR ("could not apply session options: %s", error.message); goto cleanup; } pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "$expr", "{", "$gt", "[", "$saleDate", "{", "$dateSubtract", "{", "startDate", "$$NOW", "unit", BCON_UTF8 ("day"), "amount", BCON_INT64 (1), "}", "}", "]", "}", "}", "}", "{", "$count", BCON_UTF8 ("totalDailySales"), "}", "]"); cursor = mongoc_collection_aggregate (sales_collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL); bson_destroy (&opts); ok = mongoc_cursor_next (cursor, &doc); if (mongoc_cursor_error (cursor, &error)) { MONGOC_ERROR ("could not get totalDailySales: %s", error.message); goto cleanup; } if (!ok) { MONGOC_ERROR ("%s", "cursor has no results"); goto cleanup; } ok = bson_iter_init_find (&iter, doc, "totalDailySales"); if (ok) { total_sales = bson_iter_as_int64 (&iter); } else { MONGOC_ERROR ("%s", "missing key: 'totalDailySales'"); goto cleanup; }
ctx := context.TODO() sess, err := client.StartSession(options.Session().SetSnapshot(true)) if err != nil { return err } defer sess.EndSession(ctx) var totalDailySales int32 err = mongo.WithSession(ctx, sess, func(ctx context.Context) error { // Count the total daily sales const totalDailySalesOutput = "totalDailySales" cursor, err := db.Collection("sales").Aggregate(ctx, mongo.Pipeline{ bson.D{{"$match", bson.D{{"$expr", bson.D{{"$gt", bson.A{"$saleDate", bson.D{{"$dateSubtract", bson.D{ {"startDate", "$$NOW"}, {"unit", "day"}, {"amount", 1}, }, }}, }, }}, }}, }}, bson.D{{"$count", totalDailySalesOutput}}, }) if err != nil { return err } if !cursor.Next(ctx) { return fmt.Errorf("expected aggregate to return a document, but got none") } resp := cursor.Current.Lookup(totalDailySalesOutput) var ok bool totalDailySales, ok = resp.Int32OK() if !ok { return fmt.Errorf("failed to find int32 field %q in document %v", totalDailySalesOutput, cursor.Current) } return nil }) if err != nil { return err }
db = client.retail async with await client.start_session(snapshot=True) as s: docs = await db.sales.aggregate( [ { "$match": { "$expr": { "$gt": [ "$saleDate", { "$dateSubtract": { "startDate": "$$NOW", "unit": "day", "amount": 1, } }, ] } } }, {"$count": "totalDailySales"}, ], session=s, ).to_list(None) total = docs[0]["totalDailySales"] print(total)
$salesCollection = $client->selectCollection('retail', 'sales'); $session = $client->startSession(['snapshot' => true]); $totalDailySales = $salesCollection->aggregate( [ [ '$match' => [ '$expr' => [ '$gt' => ['$saleDate', [ '$dateSubtract' => [ 'startDate' => '$$NOW', 'unit' => 'day', 'amount' => 1, ], ], ], ], ], ], ['$count' => 'totalDailySales'], ], ['session' => $session], )->toArray()[0]->totalDailySales;
db = client.retail with client.start_session(snapshot=True) as s: db.sales.aggregate( [ { "$match": { "$expr": { "$gt": [ "$saleDate", { "$dateSubtract": { "startDate": "$$NOW", "unit": "day", "amount": 1, } }, ] } } }, {"$count": "totalDailySales"}, ], session=s, ).next()["totalDailySales"]
上記のクエリでは、
$gt
演算子と$dateSubtract
式を使用して、クエリが実行される 1 日前よりも前にsaleDate
が より大きいドキュメントを返します。一致するドキュメントの数を返すには、
$count
を使用します。 カウントはtotalDailySales
変数に保存されます。読み取り保証(read concern
"snapshot"
を指定して、クエリが単一の時点から読み取られるようにします。
sales
コレクションはかなり大きいため、このクエリの実行には数分かかる場合があります。 この店はオンラインであるため、いつでも売上が発生する可能性があります。
たとえば、次の場合について考えてみましょう。
クエリは、午前12 : 00に実行を開始します。
あるカスタマーが、 12 : 02に 3 ペアのキーを購入します。
クエリの実行は、 12 : 04に実行を終了します。
クエリが読み取り保証 (read concern) "snapshot"
を使用していない場合、クエリが開始してから終了するまでの間に発生する売上は、レポートが発行される日に発生していなくても、クエリ数に含めることができます。 そのため、一部の売上が 2 回カウントされるなど、不正確なレポートが表示される可能性があります。
読み取り保証 (read concern) "snapshot"
を指定すると、クエリは、クエリの実行が開始される直前の時点でデータベースに存在していたデータのみを返します。
注意
クエリに WiredTiger 履歴保持期間(デフォルトでは300秒)よりも長い時間がかかる場合、クエリはSnapshotTooOld
エラーでエラーになります。 スナップショット保持を構成し、長時間実行クエリを有効にする方法については、「スナップショット保持の構成 」を参照してください。
スナップショット保持の構成
デフォルトでは、WiredTiger のストレージエンジンは履歴を300秒保持します。 snapshot=true
とのセッションは、セッションの最初の操作から最後の操作まで合計で300秒にわたって使用できます。 セッションを長時間使用すると、セッションは失敗し、 SnapshotTooOld
エラーが発生します。 同様に、読み取り保証(read concern "snapshot"
を使用してデータをクエリし、クエリが300秒を超えると、クエリは失敗します。
クエリまたはセッションが300秒を超えて実行される場合は、スナップショット保持期間を増やすことを検討してください。 保持期間を延長するには、 minSnapshotHistoryWindowInSeconds
パラメータを変更します。
たとえば、次のコマンドは、 minSnapshotHistoryWindowInSeconds
の値を600秒に設定します。
db.adminCommand( { setParameter: 1, minSnapshotHistoryWindowInSeconds: 600 } )
重要
ディスク領域と履歴
minSnapshotHistoryWindowInSeconds
の値を増やすと、サーバーは指定された時間枠内で古い変更値の履歴を維持する必要があるため、ディスク使用量が増加します。使用されるディスク容量はワークロードによって異なり、ワークロードのボリュームが大きいほど、より多くのディスク容量が必要になります。