드라이버 API
Callback API와 Core API
트랜잭션을 시작하고, 지정된 작업을 실행하며, 커밋(또는 오류 발생 시 중단)합니다.
TransientTransactionError
및UnknownTransactionCommitResult
에 대한 오류 처리 로직을 자동으로 통합합니다.
트랜잭션을 시작하고 커밋하려면 명시적인 호출이 필요합니다.
TransientTransactionError
및UnknownTransactionCommitResult
에 대한 오류 처리 로직을 통합하지 않고 대신 이러한 오류에 대한 사용자 지정 오류 처리를 통합할 수 있는 유연성을 제공합니다.
Callback API
콜백 API는 로직을 통합합니다.
트랜잭션에
TransientTransactionError
오류가 발생하면 트랜잭션 전체를 다시 시도합니다.커밋에
UnknownTransactionCommitResult
오류가 발생하면 커밋 작업을 다시 시도합니다.
MongoDB 6.2부터 서버는 TransactionTooLargeForCache
오류를 수신하는 경우 트랜잭션을 재시도하지 않습니다.
예시
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션 의 작업은 트랜잭션 수준 읽기 고려 (read concern), 트랜잭션 수준 쓰기 고려 (write concern) 및 트랜잭션 수준 읽기 설정 (read preference) 을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
static bool with_transaction_example (bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_write_concern_t *wc = NULL; mongoc_collection_t *coll = NULL; bool success = false; bool ret = false; bson_t *doc = NULL; bson_t *insert_opts = NULL; mongoc_client_session_t *session = NULL; mongoc_transaction_opt_t *txn_opts = NULL; /* For a replica set, include the replica set name and a seedlist of the * members in the URI string; e.g. * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \ * "27017/?replicaSet=myRepl"; * client = mongoc_client_new (uri_repl); * For a sharded cluster, connect to the mongos instances; e.g. * uri_sharded = * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; * client = mongoc_client_new (uri_sharded); */ client = get_client (); /* Prereq: Create collections. Note Atlas connection strings include a majority write * concern by default. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 0); insert_opts = bson_new (); mongoc_write_concern_append (wc, insert_opts); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } /* Step 1: Start a client session. */ session = mongoc_client_start_session (client, NULL /* opts */, error); if (!session) { goto fail; } /* Step 2: Optional. Define options to use for the transaction. */ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_write_concern (txn_opts, wc); /* Step 3: Use mongoc_client_session_with_transaction to start a transaction, * execute the callback, and commit (or abort on error). */ ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error); if (!ret) { goto fail; } success = true; fail: bson_destroy (doc); mongoc_collection_destroy (coll); bson_destroy (insert_opts); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); mongoc_client_session_destroy (session); mongoc_client_destroy (client); return success; } /* Define the callback that specifies the sequence of operations to perform * inside the transactions. */ static bool callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t *doc = NULL; bool success = false; bool ret = false; BSON_UNUSED (ctx); client = mongoc_client_session_get_client (session); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (1)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (999)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } success = true; fail: mongoc_collection_destroy (coll); bson_destroy (doc); return success; }
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}}; // Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be // needed. The suggested Atlas connection string includes majority write concern by default. write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try { options::insert opts; opts.write_concern(wc_majority); foo.insert_one(make_document(kvp("abc", 0)), opts); bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (mongocxx::exception const& e) { std::cout << "An exception occurred while inserting: " << e.what() << std::endl; return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) { // Important:: You must pass the session to the operations. foo.insert_one(*session, make_document(kvp("abc", 1))); bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try { options::transaction opts; opts.write_concern(wc_majority); session.with_transaction(callback, opts); } catch (mongocxx::exception const& e) { std::cout << "An exception occurred: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS;
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) { // Step 2: Optional. Define options to use for the transaction. var transactionOptions = new TransactionOptions( writeConcern: WriteConcern.WMajority); // Step 3: Define the sequence of operations to perform inside the transactions var cancellationToken = CancellationToken.None; // normally a real token would be used result = session.WithTransaction( (s, ct) => { try { collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct); collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct); } catch (MongoWriteException) { // Do something in response to the exception throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur. } return "Inserted into collections in different databases"; }, transactionOptions, cancellationToken); }
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" // For a sharded cluster, connect to the mongos instances; e.g. // uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" uri := mtest.ClusterURI() clientOpts := options.Client().ApplyURI(uri) client, err := mongo.Connect(clientOpts) if err != nil { return err } defer func() { _ = client.Disconnect(ctx) }() // Prereq: Create collections. wcMajority := writeconcern.Majority() wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. callback := func(sesctx context.Context) (interface{}, error) { // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } return nil, nil } // Step 2: Start a session and run the callback using WithTransaction. session, err := client.StartSession() if err != nil { return err } defer session.EndSession(ctx) result, err := session.WithTransaction(ctx, callback) if err != nil { return err } log.Printf("result: %v\n", result) return nil }
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /* Create collections. */ client.getDatabase("mydb1").getCollection("foo") .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar") .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.LOCAL) .writeConcern(WriteConcern.MAJORITY) .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() { public String execute() { MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo"); MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar"); /* Important:: You must pass the session to the operations.. */ coll1.insertOne(clientSession, new Document("abc", 1)); coll2.insertOne(clientSession, new Document("xyz", 999)); return "Inserted into collections in different databases"; } }; try { /* Step 4: Use .withTransaction() to start a transaction, execute the callback, and commit (or abort on error). */ clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) { // some error handling } finally { clientSession.close(); }
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session): collection_one = my_session.client.mydb1.foo collection_two = my_session.client.mydb2.bar # Important:: You must pass the session to the operations. await collection_one.insert_one({"abc": 1}, session=my_session) await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' const client = new MongoClient(uri); await client.connect(); // Prereq: Create collections. await client .db('mydb1') .collection('foo') .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } }); await client .db('mydb2') .collection('bar') .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } }); // Step 1: Start a Client Session const session = client.startSession(); // Step 2: Optional. Define options to use for the transaction const transactionOptions = { readPreference: 'primary', readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }; // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error) // Note: The callback for withTransaction MUST be async and/or return a Promise. try { await session.withTransaction(async () => { const coll1 = client.db('mydb1').collection('foo'); const coll2 = client.db('mydb2').collection('bar'); // Important:: You must pass the session to the operations await coll1.insertOne({ abc: 1 }, { session }); await coll2.insertOne({ xyz: 999 }, { session }); }, transactionOptions); } finally { await session.endSession(); await client.close(); }
참고
Perl (Perl) 운전자 의 경우 대신 Core API 사용 예시 를 참조하세요.
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
/* * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' * For a sharded cluster, connect to the mongos instances; e.g. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection( 'mydb1', 'foo', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['abc' => 0]); $client->selectCollection( 'mydb2', 'bar', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void { $client ->selectCollection('mydb1', 'foo') ->insertOne(['abc' => 1], ['session' => $session]); $client ->selectCollection('mydb2', 'bar') ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback);
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session): collection_one = session.client.mydb1.foo collection_two = session.client.mydb2.bar # Important:: You must pass the session to the operations. collection_one.insert_one({"abc": 1}, session=session) collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
중요
MongoDB 버전에 맞는 MongoDB 드라이버를 사용하세요.
드라이버를 사용할 때 트랜잭션의 각 작업은 각 작업에 세션을 전달해야 합니다.
트랜잭션의 작업은 트랜잭션 수준의 읽기 고려, 트랜잭션 수준의 쓰기 고려 및 트랜잭션 수준의 읽기 설정을 사용합니다.
트랜잭션에서 암시적 또는 명시적으로 컬렉션을 만들 수 있습니다. 트랜잭션에서 컬렉션 및 인덱스 생성을 참조하세요.
이 예시에서는 트랜잭션 작업에 새로운 콜백 API를 사용합니다. 이 API는 트랜잭션을 시작하고, 지정된 작업을 실행하고, 커밋(또는 오류 시 중단)합니다. 새로운 콜백 API는 TransientTransactionError
또는 UnknownTransactionCommitResult
커밋 오류에 대한 재시도 로직을 통합합니다.
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session| collection_one = client.use('mydb1')['foo'] collection_two = client.use('mydb2')['bar'] # Important: You must pass the session to the operations. collection_one.insert_one({'abc': 1}, session: my_session) collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( read_concern: {level: :local}, write_concern: {w: :majority, wtimeout: 1000}, read: {mode: :primary}, &callback)
참고
Scala 운전자 의 경우 대신 Core API 사용 예시 를 참조하세요.
Core API
코어 트랜잭션 API에는 다음과 같은 레이블이 지정된 오류에 대한 재시도 로직이 포함되어 있지 않습니다.
TransientTransactionError
. 트랜잭션의 작업이TransientTransactionError
레이블이 지정된 오류를 반환하는 경우 트랜잭션 전체를 다시 시도할 수 있습니다.TransientTransactionError
를 처리하려면 애플리케이션에 오류에 대한 재시도 로직을 명시적으로 통합해야 합니다.UnknownTransactionCommitResult
. 커밋이UnknownTransactionCommitResult
라는 레이블의 오류를 반환하면 커밋을 다시 시도할 수 있습니다.UnknownTransactionCommitResult
를 처리하려면 애플리케이션에 오류에 대한 재시도 로직을 명시적으로 통합해야 합니다.
예시
➤ 오른쪽 상단의 언어 선택 드롭다운 메뉴를 사용하여 이 페이지에 있는 예시의 언어를 설정하세요.
다음 예시에는 일시적인 오류에 대해 트랜잭션을 다시 시도하고 알 수 없는 커밋 오류에 대해 커밋을 다시 시도하는 로직이 포함되어 있습니다.
/* takes a session, an out-param for server reply, and out-param for error. */ typedef bool (*txn_func_t) (mongoc_client_session_t *, bson_t *, bson_error_t *); /* runs transactions with retry logic */ bool run_transaction_with_retry (txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error) { bson_t reply; bool r; while (true) { /* perform transaction */ r = txn_func (cs, &reply, error); if (r) { /* success */ bson_destroy (&reply); return true; } MONGOC_WARNING ("Transaction aborted: %s", error->message); if (mongoc_error_has_label (&reply, "TransientTransactionError")) { /* on transient error, retry the whole transaction */ MONGOC_WARNING ("TransientTransactionError, retrying transaction..."); bson_destroy (&reply); } else { /* non-transient error */ break; } } bson_destroy (&reply); return false; } /* commit transactions with retry logic */ bool commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error) { bson_t reply; bool r; while (true) { /* commit uses write concern set at transaction start, see * mongoc_transaction_opts_set_write_concern */ r = mongoc_client_session_commit_transaction (cs, &reply, error); if (r) { MONGOC_DEBUG ("Transaction committed"); break; } if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) { MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ..."); bson_destroy (&reply); } else { /* commit failed, cannot retry */ break; } } bson_destroy (&reply); return r; } /* updates two collections in a transaction and calls commit_with_retry */ bool update_employee_info (mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error) { mongoc_client_t *client; mongoc_collection_t *employees; mongoc_collection_t *events; mongoc_read_concern_t *rc; mongoc_write_concern_t *wc; mongoc_transaction_opt_t *txn_opts; bson_t opts = BSON_INITIALIZER; bson_t *filter = NULL; bson_t *update = NULL; bson_t *event = NULL; bool r; bson_init (reply); client = mongoc_client_session_get_client (cs); employees = mongoc_client_get_collection (client, "hr", "employees"); events = mongoc_client_get_collection (client, "reporting", "events"); rc = mongoc_read_concern_new (); mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); wc = mongoc_write_concern_new (); mongoc_write_concern_set_w ( wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_read_concern (txn_opts, rc); mongoc_transaction_opts_set_write_concern (txn_opts, wc); r = mongoc_client_session_start_transaction (cs, txn_opts, error); if (!r) { goto done; } r = mongoc_client_session_append (cs, &opts, error); if (!r) { goto done; } filter = BCON_NEW ("employee", BCON_INT32 (3)); update = BCON_NEW ("$set", "{", "status", "Inactive", "}"); /* mongoc_collection_update_one will reinitialize reply */ bson_destroy (reply); r = mongoc_collection_update_one (employees, filter, update, &opts, reply, error); if (!r) { goto abort; } event = BCON_NEW ("employee", BCON_INT32 (3)); BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}"); bson_destroy (reply); r = mongoc_collection_insert_one (events, event, &opts, reply, error); if (!r) { goto abort; } r = commit_with_retry (cs, error); abort: if (!r) { MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message); mongoc_client_session_abort_transaction (cs, NULL); } done: mongoc_collection_destroy (employees); mongoc_collection_destroy (events); mongoc_read_concern_destroy (rc); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); bson_destroy (&opts); bson_destroy (filter); bson_destroy (update); bson_destroy (event); return r; } void example_func (mongoc_client_t *client) { mongoc_client_session_t *cs; bson_error_t error; bool r; ASSERT (client); cs = mongoc_client_start_session (client, NULL, &error); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); return; } r = run_transaction_with_retry (update_employee_info, cs, &error); if (!r) { MONGOC_ERROR ("Could not update employee, permanent error: %s", error.message); } mongoc_client_session_destroy (cs); }
using transaction_func = std::function<void(client_session & session)>; auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) { while (true) { try { txn_func(session); // performs transaction. break; } catch (operation_exception const& oe) { std::cout << "Transaction aborted. Caught exception during transaction." << std::endl; // If transient error, retry the whole transaction. if (oe.has_error_label("TransientTransactionError")) { std::cout << "TransientTransactionError, retrying transaction ..." << std::endl; continue; } else { throw oe; } } } }; auto commit_with_retry = [](client_session& session) { while (true) { try { session.commit_transaction(); // Uses write concern set at transaction start. std::cout << "Transaction committed." << std::endl; break; } catch (operation_exception const& oe) { // Can retry commit if (oe.has_error_label("UnknownTransactionCommitResult")) { std::cout << "UnknownTransactionCommitResult, retrying commit operation ..." << std::endl; continue; } else { std::cout << "Error during commit ..." << std::endl; throw oe; } } } }; // Updates two collections in a transaction auto update_employee_info = [&](client_session& session) { auto& client = session.client(); auto employees = client["hr"]["employees"]; auto events = client["reporting"]["events"]; options::transaction txn_opts; read_concern rc; rc.acknowledge_level(read_concern::level::k_snapshot); txn_opts.read_concern(rc); write_concern wc; wc.acknowledge_level(write_concern::level::k_majority); txn_opts.write_concern(wc); session.start_transaction(txn_opts); try { employees.update_one( make_document(kvp("employee", 3)), make_document(kvp("$set", make_document(kvp("status", "Inactive"))))); events.insert_one(make_document( kvp("employee", 3), kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active"))))); } catch (operation_exception const& oe) { std::cout << "Caught exception during transaction, aborting." << std::endl; session.abort_transaction(); throw oe; } commit_with_retry(session); }; auto session = client.start_session(); try { run_transaction_with_retry(update_employee_info, session); } catch (operation_exception const& oe) { // Do something with error. throw oe; }
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session) { while (true) { try { txnFunc(client, session); // performs transaction break; } catch (MongoException exception) { // if transient error, retry the whole transaction if (exception.HasErrorLabel("TransientTransactionError")) { Console.WriteLine("TransientTransactionError, retrying transaction."); continue; } else { throw; } } } } public void CommitWithRetry(IClientSessionHandle session) { while (true) { try { session.CommitTransaction(); Console.WriteLine("Transaction committed."); break; } catch (MongoException exception) { // can retry commit if (exception.HasErrorLabel("UnknownTransactionCommitResult")) { Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation"); continue; } else { Console.WriteLine($"Error during commit: {exception.Message}."); throw; } } } } // updates two collections in a transaction public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session) { var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees"); var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events"); session.StartTransaction(new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority)); try { employeesCollection.UpdateOne( session, Builders<BsonDocument>.Filter.Eq("employee", 3), Builders<BsonDocument>.Update.Set("status", "Inactive")); eventsCollection.InsertOne( session, new BsonDocument { { "employee", 3 }, { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } } }); } catch (Exception exception) { Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}."); session.AbortTransaction(); throw; } CommitWithRetry(session); } public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client) { // start a session using (var session = client.StartSession()) { try { RunTransactionWithRetry(UpdateEmployeeInfo, client, session); } catch (Exception exception) { // do something with error Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}."); } } }
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error { for { err := txnFn(ctx) // Performs transaction. if err == nil { return nil } log.Println("Transaction aborted. Caught exception during transaction.") // If transient error, retry the whole transaction if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") { log.Println("TransientTransactionError, retrying transaction...") continue } return err } } commitWithRetry := func(ctx context.Context) error { sess := mongo.SessionFromContext(ctx) for { err := sess.CommitTransaction(ctx) switch e := err.(type) { case nil: log.Println("Transaction committed.") return nil case mongo.CommandError: // Can retry commit if e.HasErrorLabel("UnknownTransactionCommitResult") { log.Println("UnknownTransactionCommitResult, retrying commit operation...") continue } log.Println("Error during commit...") return e default: log.Println("Error during commit...") return e } } } // Updates two collections in a transaction. updateEmployeeInfo := func(ctx context.Context) error { employees := client.Database("hr").Collection("employees") events := client.Database("reporting").Collection("events") sess := mongo.SessionFromContext(ctx) err := sess.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.Majority()), ) if err != nil { return err } _, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) if err != nil { sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } _, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) if err != nil { sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } return commitWithRetry(ctx) } txnOpts := options.Transaction().SetReadPreference(readpref.Primary()) return client.UseSessionWithOptions( ctx, options.Session().SetDefaultTransactionOptions(txnOpts), func(ctx context.Context) error { return runTransactionWithRetry(ctx, updateEmployeeInfo) }, ) }
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
void runTransactionWithRetry(Runnable transactional) { while (true) { try { transactional.run(); break; } catch (MongoException e) { System.out.println("Transaction aborted. Caught exception during transaction."); if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { System.out.println("TransientTransactionError, aborting transaction and retrying ..."); continue; } else { throw e; } } } } void commitWithRetry(ClientSession clientSession) { while (true) { try { clientSession.commitTransaction(); System.out.println("Transaction committed"); break; } catch (MongoException e) { // can retry commit if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { System.out.println("UnknownTransactionCommitResult, retrying commit operation ..."); continue; } else { System.out.println("Exception during commit ..."); throw e; } } } } void updateEmployeeInfo() { MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees"); MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events"); TransactionOptions txnOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .build(); try (ClientSession clientSession = client.startSession()) { clientSession.startTransaction(txnOptions); employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")); eventsCollection.insertOne(clientSession, new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active"))); commitWithRetry(clientSession); } } void updateEmployeeInfoWithRetry() { runTransactionWithRetry(this::updateEmployeeInfo); }
참고
Motor 의 경우 대신 Callback API 를 참조하세요.
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
async function commitWithRetry(session) { try { await session.commitTransaction(); console.log('Transaction committed.'); } catch (error) { if (error.hasErrorLabel('UnknownTransactionCommitResult')) { console.log('UnknownTransactionCommitResult, retrying commit operation ...'); await commitWithRetry(session); } else { console.log('Error during commit ...'); throw error; } } } async function runTransactionWithRetry(txnFunc, client, session) { try { await txnFunc(client, session); } catch (error) { console.log('Transaction aborted. Caught exception during transaction.'); // If transient error, retry the whole transaction if (error.hasErrorLabel('TransientTransactionError')) { console.log('TransientTransactionError, retrying transaction ...'); await runTransactionWithRetry(txnFunc, client, session); } else { throw error; } } } async function updateEmployeeInfo(client, session) { session.startTransaction({ readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' }, readPreference: 'primary' }); const employeesCollection = client.db('hr').collection('employees'); const eventsCollection = client.db('reporting').collection('events'); await employeesCollection.updateOne( { employee: 3 }, { $set: { status: 'Inactive' } }, { session } ); await eventsCollection.insertOne( { employee: 3, status: { new: 'Inactive', old: 'Active' } }, { session } ); try { await commitWithRetry(session); } catch (error) { await session.abortTransaction(); throw error; } } return client.withSession(session => runTransactionWithRetry(updateEmployeeInfo, client, session) );
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
sub runTransactionWithRetry { my ( $txnFunc, $session ) = @_; LOOP: { eval { $txnFunc->($session); # performs transaction }; if ( my $error = $@ ) { print("Transaction aborted-> Caught exception during transaction.\n"); # If transient error, retry the whole transaction if ( $error->has_error_label("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ->..\n"); redo LOOP; } else { die $error; } } } return; } sub commitWithRetry { my ($session) = @_; LOOP: { eval { $session->commit_transaction(); # Uses write concern set at transaction start. print("Transaction committed->\n"); }; if ( my $error = $@ ) { # Can retry commit if ( $error->has_error_label("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ->..\n"); redo LOOP; } else { print("Error during commit ->..\n"); die $error; } } } return; } # Updates two collections in a transactions sub updateEmployeeInfo { my ($session) = @_; my $employeesCollection = $session->client->ns("hr.employees"); my $eventsCollection = $session->client->ns("reporting.events"); $session->start_transaction( { readConcern => { level => "snapshot" }, writeConcern => { w => "majority" }, readPreference => 'primary', } ); eval { $employeesCollection->update_one( { employee => 3 }, { '$set' => { status => "Inactive" } }, { session => $session}, ); $eventsCollection->insert_one( { employee => 3, status => { new => "Inactive", old => "Active" } }, { session => $session}, ); }; if ( my $error = $@ ) { print("Caught exception during transaction, aborting->\n"); $session->abort_transaction(); die $error; } commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval { runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) { # Do something with error } $session->end_session();
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void { while (true) { try { $txnFunc($client, $session); // performs transaction break; } catch (\MongoDB\Driver\Exception\CommandException $error) { $resultDoc = $error->getResultDocument(); // If transient error, retry the whole transaction if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) { continue; } else { throw $error; } } catch (\MongoDB\Driver\Exception\Exception $error) { throw $error; } } } private function commitWithRetry3(\MongoDB\Driver\Session $session): void { while (true) { try { $session->commitTransaction(); echo "Transaction committed.\n"; break; } catch (\MongoDB\Driver\Exception\CommandException $error) { $resultDoc = $error->getResultDocument(); if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) { echo "UnknownTransactionCommitResult, retrying commit operation ...\n"; continue; } else { echo "Error during commit ...\n"; throw $error; } } catch (\MongoDB\Driver\Exception\Exception $error) { echo "Error during commit ...\n"; throw $error; } } } private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void { $session->startTransaction([ 'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'), 'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY), 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY), ]); try { $client->hr->employees->updateOne( ['employee' => 3], ['$set' => ['status' => 'Inactive']], ['session' => $session], ); $client->reporting->events->insertOne( ['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']], ['session' => $session], ); } catch (\MongoDB\Driver\Exception\Exception $error) { echo "Caught exception during transaction, aborting.\n"; $session->abortTransaction(); throw $error; } $this->commitWithRetry3($session); } private function doUpdateEmployeeInfo(\MongoDB\Client $client): void { // Start a session. $session = $client->startSession(); try { $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session); } catch (\MongoDB\Driver\Exception\Exception) { // Do something with error } }
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
def run_transaction_with_retry(txn_func, session): while True: try: txn_func(session) # performs transaction break except (ConnectionFailure, OperationFailure) as exc: # If transient error, retry the whole transaction if exc.has_error_label("TransientTransactionError"): print("TransientTransactionError, retrying transaction ...") continue else: raise def commit_with_retry(session): while True: try: # Commit uses write concern set at transaction start. session.commit_transaction() print("Transaction committed.") break except (ConnectionFailure, OperationFailure) as exc: # Can retry commit if exc.has_error_label("UnknownTransactionCommitResult"): print("UnknownTransactionCommitResult, retrying commit operation ...") continue else: print("Error during commit ...") raise # Updates two collections in a transactions def update_employee_info(session): employees_coll = session.client.hr.employees events_coll = session.client.reporting.events with session.start_transaction( read_concern=ReadConcern("snapshot"), write_concern=WriteConcern(w="majority"), read_preference=ReadPreference.PRIMARY, ): employees_coll.update_one( {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session ) events_coll.insert_one( {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session ) commit_with_retry(session) # Start a session. with client.start_session() as session: try: run_transaction_with_retry(update_employee_info, session) except Exception: # Do something with error. raise
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
def run_transaction_with_retry(session) begin yield session # performs transaction rescue Mongo::Error => e puts 'Transaction aborted. Caught exception during transaction.' raise unless e.label?('TransientTransactionError') puts "TransientTransactionError, retrying transaction ..." retry end end def commit_with_retry(session) begin session.commit_transaction puts 'Transaction committed.' rescue Mongo::Error => e if e.label?('UnknownTransactionCommitResult') puts "UnknownTransactionCommitResult, retrying commit operation ..." retry else puts 'Error during commit ...' raise end end end # updates two collections in a transaction def update_employee_info(session) employees_coll = session.client.use(:hr)[:employees] events_coll = session.client.use(:reporting)[:events] session.start_transaction(read_concern: { level: :snapshot }, write_concern: { w: :majority }, read: {mode: :primary}) employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} }, session: session) events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } }, session: session) commit_with_retry(session) end session = client.start_session begin run_transaction_with_retry(session) do update_employee_info(session) end rescue StandardError => e # Do something with error raise end
중요
읽기 및 쓰기 작업을 트랜잭션과 연결하려면 반드시 트랜잭션의 각 작업에 세션을 전달해야 합니다.
/* * Copyright 2008-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec { // Implicit functions that execute the Observable and return the results val waitDuration = Duration(5, "seconds") implicit class ObservableExecutor[T](observable: Observable[T]) { def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration) } implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) { def execute(): T = Await.result(observable.toFuture(), waitDuration) } // end implicit functions "The Scala driver" should "be able to commit a transaction" in withClient { client => assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost()) client.getDatabase("hr").drop().execute() client.getDatabase("hr").createCollection("employees").execute() client.getDatabase("hr").createCollection("events").execute() updateEmployeeInfoWithRetry(client).execute() should equal(Completed()) client.getDatabase("hr").drop().execute() should equal(Completed()) } def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = { observable.map(clientSession => { val employeesCollection = database.getCollection("employees") val eventsCollection = database.getCollection("events") val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")) .subscribe((res: UpdateResult) => println(res)) eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))) .subscribe((res: Completed) => println(res)) clientSession }) } def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { println("UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { println(s"Exception during commit ...: $e") throw e } }) } def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = { val database = client.getDatabase("hr") val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) } }
드라이버 버전
트랜잭션 오류 처리
데이터베이스 시스템(MongoDB 또는 관계형 데이터베이스)에 관계없이 애플리케이션은 트랜잭션 커밋 중 오류를 처리하고 트랜잭션에 대한 재시도 로직을 통합하는 조치를 취해야 합니다.
TransientTransactionError
retryWrites
의 값에 관계없이 트랜잭션 내부의 개별 쓰기 작업은 다시 시도할 수 없습니다. 작업에서 레이블과 관련된 "TransientTransactionError"
오류가 발생하면(예: 프라이머리 스텝다운), 트랜잭션 전체를 다시 시도할 수 있습니다.
콜백 API는
"TransientTransactionError"
에 대한 재시도 로직을 통합합니다.코어 트랜잭션 API에는
"TransientTransactionError"
에 대한 재시도 로직이 통합되어 있지 않습니다."TransientTransactionError"
를 처리하려면 애플리케이션에 오류에 대한 재시도 로직을 명시적으로 통합해야 합니다. 일시적 오류에 대한 재시도 논리를 포함하는 예시를 보려면 Core API 예시를 참조하세요.
UnknownTransactionCommitResult
커밋 작업은 재시도 가능한 쓰기 작업 입니다. 커밋 작업에서 오류가 발생하면 retryWrites
의 값에 관계없이 MongoDB 드라이버는 커밋을 다시 시도합니다.
커밋 작업에서 "UnknownTransactionCommitResult"
레이블이 지정된 오류가 발생하면 커밋을 다시 시도할 수 있습니다.
콜백 API는
"UnknownTransactionCommitResult"
에 대한 재시도 로직을 통합합니다.코어 트랜잭션 API에는
"UnknownTransactionCommitResult"
에 대한 재시도 로직이 통합되어 있지 않습니다."UnknownTransactionCommitResult"
를 처리하려면 애플리케이션에 오류에 대한 재시도 로직을 명시적으로 통합해야 합니다. 알 수 없는 커밋 오류에 대한 재시도 로직을 포함하는 예시를 보려면 Core API 예시를 참조하세요.
TransactionTooLargeForCache
버전 6.2에 추가되었습니다.
MongoDB 6.2부터 서버는 TransactionTooLargeForCache
오류를 수신하는 경우 트랜잭션을 재시도하지 않습니다. 이 오류는 캐시가 너무 작아 재시도에 실패할 가능성이 높다는 의미입니다.
transactionTooLargeForCacheThreshold
임계값의 기본값은 0.75
입니다. 트랜잭션이 캐시의 75%를 초과하여 사용하는 경우, 서버는 트랜잭션을 재시도하는 대신 TransactionTooLargeForCache
를 반환합니다.
이전 버전의 MongoDB에서는 서버가 TransactionTooLargeForCache
대신 TemporarilyUnavailable
또는 WriteConflict
를 반환합니다.
오류 임계값을 수정하려면 setParameter
명령을 사용합니다.
추가 정보
mongosh
예시
다음의 mongosh
메서드는 트랜잭션에 사용할 수 있습니다.
참고
mongosh
예시에서는 단순화를 위해 재시도 로직과 강력한 오류 처리를 생략했습니다. 애플리케이션에 트랜잭션을 통합하는 더 실용적인 예시는 대신 트랜잭션 오류 처리를 확인하세요.
// Create collections: db.getSiblingDB("mydb1").foo.insertOne( {abc: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } ) db.getSiblingDB("mydb2").bar.insertOne( {xyz: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } ) // Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } ); coll1 = session.getDatabase("mydb1").foo; coll2 = session.getDatabase("mydb2").bar; // Start a transaction session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } ); // Operations inside the transaction try { coll1.insertOne( { abc: 1 } ); coll2.insertOne( { xyz: 999 } ); } catch (error) { // Abort transaction on error session.abortTransaction(); throw error; } // Commit the transaction using write concern set at transaction start session.commitTransaction(); session.endSession();