ドライバー API
Callback API と Core API
トランザクションを開始し、指定された操作を実行し、コミット(エラーの場合は中止)します。
TransientTransactionError
およびUnknownTransactionCommitResult
のエラー処理ロジックが自動的に組み込まれます。
トランザクションを開始し、トランザクションをコミットするには、明示的な呼び出しが必要です。
TransientTransactionError
およびUnknownTransactionCommitResult
のエラー処理ロジックを組み込まず、代わりにこれらのエラーに対するカスタム エラー処理を組み込む柔軟性を提供します。
Callback API
Callback API には次のロジックが組み込まれています。
トランザクションで
TransientTransactionError
エラーが発生した場合にトランザクション全体を再試行する方法。コミットで
UnknownTransactionCommitResult
エラーが発生した場合にコミット操作を再試行する方法
MongoDB 6.0.5以降、サーバーはTransactionTooLargeForCache
エラーを受信してもトランザクションを再試行しません。
例
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
static bool with_transaction_example (bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_write_concern_t *wc = NULL; mongoc_collection_t *coll = NULL; bool success = false; bool ret = false; bson_t *doc = NULL; bson_t *insert_opts = NULL; mongoc_client_session_t *session = NULL; mongoc_transaction_opt_t *txn_opts = NULL; /* For a replica set, include the replica set name and a seedlist of the * members in the URI string; e.g. * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \ * "27017/?replicaSet=myRepl"; * client = mongoc_client_new (uri_repl); * For a sharded cluster, connect to the mongos instances; e.g. * uri_sharded = * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; * client = mongoc_client_new (uri_sharded); */ client = get_client (); /* Prereq: Create collections. Note Atlas connection strings include a majority write * concern by default. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 0); insert_opts = bson_new (); mongoc_write_concern_append (wc, insert_opts); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } /* Step 1: Start a client session. */ session = mongoc_client_start_session (client, NULL /* opts */, error); if (!session) { goto fail; } /* Step 2: Optional. Define options to use for the transaction. */ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_write_concern (txn_opts, wc); /* Step 3: Use mongoc_client_session_with_transaction to start a transaction, * execute the callback, and commit (or abort on error). */ ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error); if (!ret) { goto fail; } success = true; fail: bson_destroy (doc); mongoc_collection_destroy (coll); bson_destroy (insert_opts); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); mongoc_client_session_destroy (session); mongoc_client_destroy (client); return success; } /* Define the callback that specifies the sequence of operations to perform * inside the transactions. */ static bool callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t *doc = NULL; bool success = false; bool ret = false; BSON_UNUSED (ctx); client = mongoc_client_session_get_client (session); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (1)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (999)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } success = true; fail: mongoc_collection_destroy (coll); bson_destroy (doc); return success; }
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}}; // Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be // needed. The suggested Atlas connection string includes majority write concern by default. write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try { options::insert opts; opts.write_concern(wc_majority); foo.insert_one(make_document(kvp("abc", 0)), opts); bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred while inserting: " << e.what() << std::endl; return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) { // Important:: You must pass the session to the operations. foo.insert_one(*session, make_document(kvp("abc", 1))); bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try { options::transaction opts; opts.write_concern(wc_majority); session.with_transaction(callback, opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS;
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) { // Step 2: Optional. Define options to use for the transaction. var transactionOptions = new TransactionOptions( writeConcern: WriteConcern.WMajority); // Step 3: Define the sequence of operations to perform inside the transactions var cancellationToken = CancellationToken.None; // normally a real token would be used result = session.WithTransaction( (s, ct) => { try { collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct); collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct); } catch (MongoWriteException) { // Do something in response to the exception throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur. } return "Inserted into collections in different databases"; }, transactionOptions, cancellationToken); }
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" // For a sharded cluster, connect to the mongos instances; e.g. // uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" uri := mtest.ClusterURI() clientOpts := options.Client().ApplyURI(uri) client, err := mongo.Connect(clientOpts) if err != nil { return err } defer func() { _ = client.Disconnect(ctx) }() // Prereq: Create collections. wcMajority := writeconcern.Majority() wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. callback := func(sesctx context.Context) (interface{}, error) { // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } return nil, nil } // Step 2: Start a session and run the callback using WithTransaction. session, err := client.StartSession() if err != nil { return err } defer session.EndSession(ctx) result, err := session.WithTransaction(ctx, callback) if err != nil { return err } log.Printf("result: %v\n", result) return nil }
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /* Create collections. */ client.getDatabase("mydb1").getCollection("foo") .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar") .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.LOCAL) .writeConcern(WriteConcern.MAJORITY) .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() { public String execute() { MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo"); MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar"); /* Important:: You must pass the session to the operations.. */ coll1.insertOne(clientSession, new Document("abc", 1)); coll2.insertOne(clientSession, new Document("xyz", 999)); return "Inserted into collections in different databases"; } }; try { /* Step 4: Use .withTransaction() to start a transaction, execute the callback, and commit (or abort on error). */ clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) { // some error handling } finally { clientSession.close(); }
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session): collection_one = my_session.client.mydb1.foo collection_two = my_session.client.mydb2.bar # Important:: You must pass the session to the operations. await collection_one.insert_one({"abc": 1}, session=my_session) await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' const client = new MongoClient(uri); await client.connect(); // Prereq: Create collections. await client .db('mydb1') .collection('foo') .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } }); await client .db('mydb2') .collection('bar') .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } }); // Step 1: Start a Client Session const session = client.startSession(); // Step 2: Optional. Define options to use for the transaction const transactionOptions = { readPreference: 'primary', readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }; // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error) // Note: The callback for withTransaction MUST be async and/or return a Promise. try { await session.withTransaction(async () => { const coll1 = client.db('mydb1').collection('foo'); const coll2 = client.db('mydb2').collection('bar'); // Important:: You must pass the session to the operations await coll1.insertOne({ abc: 1 }, { session }); await coll2.insertOne({ xyz: 999 }, { session }); }, transactionOptions); } finally { await session.endSession(); await client.close(); }
注意
Perl ドライバーについては、代わりにCore APIの使用例を参照してください。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
/* * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' * For a sharded cluster, connect to the mongos instances; e.g. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection( 'mydb1', 'foo', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['abc' => 0]); $client->selectCollection( 'mydb2', 'bar', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void { $client ->selectCollection('mydb1', 'foo') ->insertOne(['abc' => 1], ['session' => $session]); $client ->selectCollection('mydb2', 'bar') ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback);
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session): collection_one = session.client.mydb1.foo collection_two = session.client.mydb2.bar # Important:: You must pass the session to the operations. collection_one.insert_one({"abc": 1}, session=session) collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、 トランザクションレベルの読み取り保証(read concern ) 、トランザクションレベルの書込み保証(write concern ) 、トランザクションレベルの読み込み設定(read preference) が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。 「トランザクション内でのコレクションとインデックスの作成 」を参照してください。
この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionError
またはUnknownTransactionCommitResult
のコミット エラーの再試行ロジックが組み込まれています。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session| collection_one = client.use('mydb1')['foo'] collection_two = client.use('mydb2')['bar'] # Important: You must pass the session to the operations. collection_one.insert_one({'abc': 1}, session: my_session) collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( read_concern: {level: :local}, write_concern: {w: :majority, wtimeout: 1000}, read: {mode: :primary}, &callback)
注意
Scala ドライバーについては、代わりに「 Core APIの使用例 」を参照してください。
Core API
Core transaction API には、次のラベルの付いたエラーの再試行ロジックは組み込まれていません。
TransientTransactionError
。 トランザクション内の操作によってTransientTransactionError
というラベルの付いたエラーが返された場合は、トランザクション全体を再試行できます。TransientTransactionError
を処理するには、アプリケーションはエラーの再試行ロジックを明示的に組み込む必要があります。UnknownTransactionCommitResult
。 コミットがUnknownTransactionCommitResult
というラベルの付いたエラーを返す場合は、コミットを再試行できます。UnknownTransactionCommitResult
を処理するには、アプリケーションはエラーの再試行ロジックを明示的に組み込む必要があります。
例
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
次の例には、一時的なエラーの場合はトランザクションを再試行し、不明なコミット エラーの場合はコミットを再試行するロジックが組み込まれています。
/* takes a session, an out-param for server reply, and out-param for error. */ typedef bool (*txn_func_t) (mongoc_client_session_t *, bson_t *, bson_error_t *); /* runs transactions with retry logic */ bool run_transaction_with_retry (txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error) { bson_t reply; bool r; while (true) { /* perform transaction */ r = txn_func (cs, &reply, error); if (r) { /* success */ bson_destroy (&reply); return true; } MONGOC_WARNING ("Transaction aborted: %s", error->message); if (mongoc_error_has_label (&reply, "TransientTransactionError")) { /* on transient error, retry the whole transaction */ MONGOC_WARNING ("TransientTransactionError, retrying transaction..."); bson_destroy (&reply); } else { /* non-transient error */ break; } } bson_destroy (&reply); return false; } /* commit transactions with retry logic */ bool commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error) { bson_t reply; bool r; while (true) { /* commit uses write concern set at transaction start, see * mongoc_transaction_opts_set_write_concern */ r = mongoc_client_session_commit_transaction (cs, &reply, error); if (r) { MONGOC_DEBUG ("Transaction committed"); break; } if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) { MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ..."); bson_destroy (&reply); } else { /* commit failed, cannot retry */ break; } } bson_destroy (&reply); return r; } /* updates two collections in a transaction and calls commit_with_retry */ bool update_employee_info (mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error) { mongoc_client_t *client; mongoc_collection_t *employees; mongoc_collection_t *events; mongoc_read_concern_t *rc; mongoc_write_concern_t *wc; mongoc_transaction_opt_t *txn_opts; bson_t opts = BSON_INITIALIZER; bson_t *filter = NULL; bson_t *update = NULL; bson_t *event = NULL; bool r; bson_init (reply); client = mongoc_client_session_get_client (cs); employees = mongoc_client_get_collection (client, "hr", "employees"); events = mongoc_client_get_collection (client, "reporting", "events"); rc = mongoc_read_concern_new (); mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); wc = mongoc_write_concern_new (); mongoc_write_concern_set_w ( wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_read_concern (txn_opts, rc); mongoc_transaction_opts_set_write_concern (txn_opts, wc); r = mongoc_client_session_start_transaction (cs, txn_opts, error); if (!r) { goto done; } r = mongoc_client_session_append (cs, &opts, error); if (!r) { goto done; } filter = BCON_NEW ("employee", BCON_INT32 (3)); update = BCON_NEW ("$set", "{", "status", "Inactive", "}"); /* mongoc_collection_update_one will reinitialize reply */ bson_destroy (reply); r = mongoc_collection_update_one (employees, filter, update, &opts, reply, error); if (!r) { goto abort; } event = BCON_NEW ("employee", BCON_INT32 (3)); BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}"); bson_destroy (reply); r = mongoc_collection_insert_one (events, event, &opts, reply, error); if (!r) { goto abort; } r = commit_with_retry (cs, error); abort: if (!r) { MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message); mongoc_client_session_abort_transaction (cs, NULL); } done: mongoc_collection_destroy (employees); mongoc_collection_destroy (events); mongoc_read_concern_destroy (rc); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); bson_destroy (&opts); bson_destroy (filter); bson_destroy (update); bson_destroy (event); return r; } void example_func (mongoc_client_t *client) { mongoc_client_session_t *cs; bson_error_t error; bool r; ASSERT (client); cs = mongoc_client_start_session (client, NULL, &error); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); return; } r = run_transaction_with_retry (update_employee_info, cs, &error); if (!r) { MONGOC_ERROR ("Could not update employee, permanent error: %s", error.message); } mongoc_client_session_destroy (cs); }
using transaction_func = std::function<void(client_session & session)>; auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) { while (true) { try { txn_func(session); // performs transaction. break; } catch (const operation_exception& oe) { std::cout << "Transaction aborted. Caught exception during transaction." << std::endl; // If transient error, retry the whole transaction. if (oe.has_error_label("TransientTransactionError")) { std::cout << "TransientTransactionError, retrying transaction ..." << std::endl; continue; } else { throw oe; } } } }; auto commit_with_retry = [](client_session& session) { while (true) { try { session.commit_transaction(); // Uses write concern set at transaction start. std::cout << "Transaction committed." << std::endl; break; } catch (const operation_exception& oe) { // Can retry commit if (oe.has_error_label("UnknownTransactionCommitResult")) { std::cout << "UnknownTransactionCommitResult, retrying commit operation ..." << std::endl; continue; } else { std::cout << "Error during commit ..." << std::endl; throw oe; } } } }; // Updates two collections in a transaction auto update_employee_info = [&](client_session& session) { auto& client = session.client(); auto employees = client["hr"]["employees"]; auto events = client["reporting"]["events"]; options::transaction txn_opts; read_concern rc; rc.acknowledge_level(read_concern::level::k_snapshot); txn_opts.read_concern(rc); write_concern wc; wc.acknowledge_level(write_concern::level::k_majority); txn_opts.write_concern(wc); session.start_transaction(txn_opts); try { employees.update_one( make_document(kvp("employee", 3)), make_document(kvp("$set", make_document(kvp("status", "Inactive"))))); events.insert_one(make_document( kvp("employee", 3), kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active"))))); } catch (const operation_exception& oe) { std::cout << "Caught exception during transaction, aborting." << std::endl; session.abort_transaction(); throw oe; } commit_with_retry(session); }; auto session = client.start_session(); try { run_transaction_with_retry(update_employee_info, session); } catch (const operation_exception& oe) { // Do something with error. throw oe; }
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session) { while (true) { try { txnFunc(client, session); // performs transaction break; } catch (MongoException exception) { // if transient error, retry the whole transaction if (exception.HasErrorLabel("TransientTransactionError")) { Console.WriteLine("TransientTransactionError, retrying transaction."); continue; } else { throw; } } } } public void CommitWithRetry(IClientSessionHandle session) { while (true) { try { session.CommitTransaction(); Console.WriteLine("Transaction committed."); break; } catch (MongoException exception) { // can retry commit if (exception.HasErrorLabel("UnknownTransactionCommitResult")) { Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation"); continue; } else { Console.WriteLine($"Error during commit: {exception.Message}."); throw; } } } } // updates two collections in a transaction public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session) { var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees"); var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events"); session.StartTransaction(new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority)); try { employeesCollection.UpdateOne( session, Builders<BsonDocument>.Filter.Eq("employee", 3), Builders<BsonDocument>.Update.Set("status", "Inactive")); eventsCollection.InsertOne( session, new BsonDocument { { "employee", 3 }, { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } } }); } catch (Exception exception) { Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}."); session.AbortTransaction(); throw; } CommitWithRetry(session); } public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client) { // start a session using (var session = client.StartSession()) { try { RunTransactionWithRetry(UpdateEmployeeInfo, client, session); } catch (Exception exception) { // do something with error Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}."); } } }
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error { for { err := txnFn(ctx) // Performs transaction. if err == nil { return nil } log.Println("Transaction aborted. Caught exception during transaction.") // If transient error, retry the whole transaction if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") { log.Println("TransientTransactionError, retrying transaction...") continue } return err } } commitWithRetry := func(ctx context.Context) error { sess := mongo.SessionFromContext(ctx) for { err := sess.CommitTransaction(ctx) switch e := err.(type) { case nil: log.Println("Transaction committed.") return nil case mongo.CommandError: // Can retry commit if e.HasErrorLabel("UnknownTransactionCommitResult") { log.Println("UnknownTransactionCommitResult, retrying commit operation...") continue } log.Println("Error during commit...") return e default: log.Println("Error during commit...") return e } } } // Updates two collections in a transaction. updateEmployeeInfo := func(ctx context.Context) error { employees := client.Database("hr").Collection("employees") events := client.Database("reporting").Collection("events") sess := mongo.SessionFromContext(ctx) err := sess.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.Majority()), ) if err != nil { return err } _, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) if err != nil { sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } _, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) if err != nil { sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } return commitWithRetry(ctx) } txnOpts := options.Transaction().SetReadPreference(readpref.Primary()) return client.UseSessionWithOptions( ctx, options.Session().SetDefaultTransactionOptions(txnOpts), func(ctx context.Context) error { return runTransactionWithRetry(ctx, updateEmployeeInfo) }, ) }
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
void runTransactionWithRetry(Runnable transactional) { while (true) { try { transactional.run(); break; } catch (MongoException e) { System.out.println("Transaction aborted. Caught exception during transaction."); if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { System.out.println("TransientTransactionError, aborting transaction and retrying ..."); continue; } else { throw e; } } } } void commitWithRetry(ClientSession clientSession) { while (true) { try { clientSession.commitTransaction(); System.out.println("Transaction committed"); break; } catch (MongoException e) { // can retry commit if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { System.out.println("UnknownTransactionCommitResult, retrying commit operation ..."); continue; } else { System.out.println("Exception during commit ..."); throw e; } } } } void updateEmployeeInfo() { MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees"); MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events"); TransactionOptions txnOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .build(); try (ClientSession clientSession = client.startSession()) { clientSession.startTransaction(txnOptions); employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")); eventsCollection.insertOne(clientSession, new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active"))); commitWithRetry(clientSession); } } void updateEmployeeInfoWithRetry() { runTransactionWithRetry(this::updateEmployeeInfo); }
注意
Motorについては、代わりにCallback APIを参照してください。
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
async function commitWithRetry(session) { try { await session.commitTransaction(); console.log('Transaction committed.'); } catch (error) { if (error.hasErrorLabel('UnknownTransactionCommitResult')) { console.log('UnknownTransactionCommitResult, retrying commit operation ...'); await commitWithRetry(session); } else { console.log('Error during commit ...'); throw error; } } } async function runTransactionWithRetry(txnFunc, client, session) { try { await txnFunc(client, session); } catch (error) { console.log('Transaction aborted. Caught exception during transaction.'); // If transient error, retry the whole transaction if (error.hasErrorLabel('TransientTransactionError')) { console.log('TransientTransactionError, retrying transaction ...'); await runTransactionWithRetry(txnFunc, client, session); } else { throw error; } } } async function updateEmployeeInfo(client, session) { session.startTransaction({ readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' }, readPreference: 'primary' }); const employeesCollection = client.db('hr').collection('employees'); const eventsCollection = client.db('reporting').collection('events'); await employeesCollection.updateOne( { employee: 3 }, { $set: { status: 'Inactive' } }, { session } ); await eventsCollection.insertOne( { employee: 3, status: { new: 'Inactive', old: 'Active' } }, { session } ); try { await commitWithRetry(session); } catch (error) { await session.abortTransaction(); throw error; } } return client.withSession(session => runTransactionWithRetry(updateEmployeeInfo, client, session) );
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
sub runTransactionWithRetry { my ( $txnFunc, $session ) = @_; LOOP: { eval { $txnFunc->($session); # performs transaction }; if ( my $error = $@ ) { print("Transaction aborted-> Caught exception during transaction.\n"); # If transient error, retry the whole transaction if ( $error->has_error_label("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ->..\n"); redo LOOP; } else { die $error; } } } return; } sub commitWithRetry { my ($session) = @_; LOOP: { eval { $session->commit_transaction(); # Uses write concern set at transaction start. print("Transaction committed->\n"); }; if ( my $error = $@ ) { # Can retry commit if ( $error->has_error_label("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ->..\n"); redo LOOP; } else { print("Error during commit ->..\n"); die $error; } } } return; } # Updates two collections in a transactions sub updateEmployeeInfo { my ($session) = @_; my $employeesCollection = $session->client->ns("hr.employees"); my $eventsCollection = $session->client->ns("reporting.events"); $session->start_transaction( { readConcern => { level => "snapshot" }, writeConcern => { w => "majority" }, readPreference => 'primary', } ); eval { $employeesCollection->update_one( { employee => 3 }, { '$set' => { status => "Inactive" } }, { session => $session}, ); $eventsCollection->insert_one( { employee => 3, status => { new => "Inactive", old => "Active" } }, { session => $session}, ); }; if ( my $error = $@ ) { print("Caught exception during transaction, aborting->\n"); $session->abort_transaction(); die $error; } commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval { runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) { # Do something with error } $session->end_session();
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void { while (true) { try { $txnFunc($client, $session); // performs transaction break; } catch (\MongoDB\Driver\Exception\CommandException $error) { $resultDoc = $error->getResultDocument(); // If transient error, retry the whole transaction if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) { continue; } else { throw $error; } } catch (\MongoDB\Driver\Exception\Exception $error) { throw $error; } } } private function commitWithRetry3(\MongoDB\Driver\Session $session): void { while (true) { try { $session->commitTransaction(); echo "Transaction committed.\n"; break; } catch (\MongoDB\Driver\Exception\CommandException $error) { $resultDoc = $error->getResultDocument(); if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) { echo "UnknownTransactionCommitResult, retrying commit operation ...\n"; continue; } else { echo "Error during commit ...\n"; throw $error; } } catch (\MongoDB\Driver\Exception\Exception $error) { echo "Error during commit ...\n"; throw $error; } } } private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void { $session->startTransaction([ 'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'), 'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY), 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY), ]); try { $client->hr->employees->updateOne( ['employee' => 3], ['$set' => ['status' => 'Inactive']], ['session' => $session], ); $client->reporting->events->insertOne( ['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']], ['session' => $session], ); } catch (\MongoDB\Driver\Exception\Exception $error) { echo "Caught exception during transaction, aborting.\n"; $session->abortTransaction(); throw $error; } $this->commitWithRetry3($session); } private function doUpdateEmployeeInfo(\MongoDB\Client $client): void { // Start a session. $session = $client->startSession(); try { $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session); } catch (\MongoDB\Driver\Exception\Exception) { // Do something with error } }
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
def run_transaction_with_retry(txn_func, session): while True: try: txn_func(session) # performs transaction break except (ConnectionFailure, OperationFailure) as exc: # If transient error, retry the whole transaction if exc.has_error_label("TransientTransactionError"): print("TransientTransactionError, retrying transaction ...") continue else: raise def commit_with_retry(session): while True: try: # Commit uses write concern set at transaction start. session.commit_transaction() print("Transaction committed.") break except (ConnectionFailure, OperationFailure) as exc: # Can retry commit if exc.has_error_label("UnknownTransactionCommitResult"): print("UnknownTransactionCommitResult, retrying commit operation ...") continue else: print("Error during commit ...") raise # Updates two collections in a transactions def update_employee_info(session): employees_coll = session.client.hr.employees events_coll = session.client.reporting.events with session.start_transaction( read_concern=ReadConcern("snapshot"), write_concern=WriteConcern(w="majority"), read_preference=ReadPreference.PRIMARY, ): employees_coll.update_one( {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session ) events_coll.insert_one( {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session ) commit_with_retry(session) # Start a session. with client.start_session() as session: try: run_transaction_with_retry(update_employee_info, session) except Exception: # Do something with error. raise
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
def run_transaction_with_retry(session) begin yield session # performs transaction rescue Mongo::Error => e puts 'Transaction aborted. Caught exception during transaction.' raise unless e.label?('TransientTransactionError') puts "TransientTransactionError, retrying transaction ..." retry end end def commit_with_retry(session) begin session.commit_transaction puts 'Transaction committed.' rescue Mongo::Error => e if e.label?('UnknownTransactionCommitResult') puts "UnknownTransactionCommitResult, retrying commit operation ..." retry else puts 'Error during commit ...' raise end end end # updates two collections in a transaction def update_employee_info(session) employees_coll = session.client.use(:hr)[:employees] events_coll = session.client.use(:reporting)[:events] session.start_transaction(read_concern: { level: :snapshot }, write_concern: { w: :majority }, read: {mode: :primary}) employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} }, session: session) events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } }, session: session) commit_with_retry(session) end session = client.start_session begin run_transaction_with_retry(session) do update_employee_info(session) end rescue StandardError => e # Do something with error raise end
重要
読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。
/* * Copyright 2008-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec { // Implicit functions that execute the Observable and return the results val waitDuration = Duration(5, "seconds") implicit class ObservableExecutor[T](observable: Observable[T]) { def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration) } implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) { def execute(): T = Await.result(observable.toFuture(), waitDuration) } // end implicit functions "The Scala driver" should "be able to commit a transaction" in withClient { client => assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost()) client.getDatabase("hr").drop().execute() client.getDatabase("hr").createCollection("employees").execute() client.getDatabase("hr").createCollection("events").execute() updateEmployeeInfoWithRetry(client).execute() should equal(Completed()) client.getDatabase("hr").drop().execute() should equal(Completed()) } def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = { observable.map(clientSession => { val employeesCollection = database.getCollection("employees") val eventsCollection = database.getCollection("events") val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")) .subscribe((res: UpdateResult) => println(res)) eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))) .subscribe((res: Completed) => println(res)) clientSession }) } def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { println("UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { println(s"Exception during commit ...: $e") throw e } }) } def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = { val database = client.getDatabase("hr") val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) } }
ドライバーのバージョン
トランザクションエラーの処理
MongoDB かリレーショナル データベースかにかかわらず、データベース システムに関係なく、トランザクションのコミット中にエラーを処理し、トランザクションの再試行ロジックを組み込む必要があります。
TransientTransactionError
トランザクション内の個々の書き込み操作は、 retryWrites
の値に関係なく、再試行できません。 If an operation encounters an error associated with the label "TransientTransactionError"
, such as when the primary steps down, the transaction as a whole can be retried.
Callback API には
"TransientTransactionError"
の再試行ロジックが組み込まれています。Core transaction API には
"TransientTransactionError"
の再試行ロジックは組み込まれていません。"TransientTransactionError"
を処理するには、アプリケーションはエラーの再試行ロジックを明示的に組み込む必要があります。 一時的なエラーの再試行ロジックを組み込んだ例については、「 Core API の例 」を参照してください。
UnknownTransactionCommitResult
コミット操作は、再試行可能な書込み操作です。 コミット操作でエラーが発生した場合、MongoDB ドライバーはretryWrites
の値に関係なくコミットを再試行します。
コミット操作で"UnknownTransactionCommitResult"
というラベルの付いたエラーが発生した場合は、コミットを再試行できます。
Callback API には
"UnknownTransactionCommitResult"
の再試行ロジックが組み込まれています。Core transaction API には
"UnknownTransactionCommitResult"
の再試行ロジックは組み込まれていません。"UnknownTransactionCommitResult"
を処理するには、アプリケーションはエラーの再試行ロジックを明示的に組み込む必要があります。 不明なコミット エラーの再試行ロジックを組み込んだ例については、「 Core API の例 」を参照してください。
TransactionTooLargeForCache
バージョン 6.0.5 の新機能。
MongoDB 6.0.5 以降、サーバーはTransactionTooLargeForCache
エラーを受信してもトランザクションを再試行しません。 このエラーは、キャッシュが小さすぎて再試行が失敗する可能性があることを意味します。
transactionTooLargeForCacheThreshold
しきい値のデフォルト値は0.75
です。 トランザクションがキャッシュの 75% 以上を使用している場合、サーバーはトランザクションを再試行する代わりにTransactionTooLargeForCache
を返します。
MongoDB の以前のバージョンでは、サーバーは ではなくTemporarilyUnavailable
WriteConflict
またはTransactionTooLargeForCache
を返します。
エラーしきい値を変更するには、 setParameter
コマンドを使用します。
詳細情報
mongosh
例
次のmongosh
メソッドはトランザクションに利用できます。
注意
mongosh
の例では、わかりやすくするために再試行ロジックと堅牢なエラー処理を省略しています。 アプリケーションにトランザクションを含めるより具体的な例については、代わりに「トランザクション エラーの処理」を参照してください。
// Create collections: db.getSiblingDB("mydb1").foo.insertOne( {abc: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } ) db.getSiblingDB("mydb2").bar.insertOne( {xyz: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } ) // Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } ); coll1 = session.getDatabase("mydb1").foo; coll2 = session.getDatabase("mydb2").bar; // Start a transaction session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } ); // Operations inside the transaction try { coll1.insertOne( { abc: 1 } ); coll2.insertOne( { xyz: 999 } ); } catch (error) { // Abort transaction on error session.abortTransaction(); throw error; } // Commit the transaction using write concern set at transaction start session.commitTransaction(); session.endSession();