Docs Menu
Docs Home
/
MongoDB マニュアル
/

ドライバー API

項目一覧

  • Callback API と Core API
  • Callback API
  • Core API
  • ドライバーのバージョン
  • トランザクションエラーの処理
  • 詳細情報

Callback API :

Core API:

  • トランザクションを開始し、トランザクションをコミットするには、明示的な呼び出しが必要です。

  • TransientTransactionErrorおよびUnknownTransactionCommitResultのエラー処理ロジックを組み込まず、代わりにこれらのエラーに対するカスタム エラー処理を組み込む柔軟性を提供します。

Callback API には次のロジックが組み込まれています。

MongoDB 6.2以降、サーバーはTransactionTooLargeForCacheエラーを受信してもトランザクションを再試行しません。


➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。


重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
read_concern rc_local{};
rc_local.acknowledge_level(read_concern::level::k_local);
read_preference rp_primary{};
rp_primary.mode(read_preference::read_mode::k_primary);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
opts.read_concern(rc_local);
opts.read_preference(rp_primary);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

重要

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances; e.g.
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations..
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

注意

Perl ドライバーについては、代わりにCore APIの使用例を参照してください。

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

重要

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの再試行ロジックが組み込まれています。

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

注意

Scala ドライバーについては、代わりに「 Core APIの使用例 」を参照してください。

Core transaction API には、次のラベルの付いたエラーの再試行ロジックは組み込まれていません。


➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。


次の例には、一時的なエラーの場合はトランザクションを再試行し、不明なコミット エラーの場合はコミットを再試行するロジックが組み込まれています。

/* takes a session, an out-param for server reply, and out-param for error. */
typedef bool (*txn_func_t) (mongoc_client_session_t *, bson_t *, bson_error_t *);
/* runs transactions with retry logic */
bool
run_transaction_with_retry (txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* perform transaction */
r = txn_func (cs, &reply, error);
if (r) {
/* success */
bson_destroy (&reply);
return true;
}
MONGOC_WARNING ("Transaction aborted: %s", error->message);
if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
/* on transient error, retry the whole transaction */
MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
bson_destroy (&reply);
} else {
/* non-transient error */
break;
}
}
bson_destroy (&reply);
return false;
}
/* commit transactions with retry logic */
bool
commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* commit uses write concern set at transaction start, see
* mongoc_transaction_opts_set_write_concern */
r = mongoc_client_session_commit_transaction (cs, &reply, error);
if (r) {
MONGOC_DEBUG ("Transaction committed");
break;
}
if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
bson_destroy (&reply);
} else {
/* commit failed, cannot retry */
break;
}
}
bson_destroy (&reply);
return r;
}
/* updates two collections in a transaction and calls commit_with_retry */
bool
update_employee_info (mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error)
{
mongoc_client_t *client;
mongoc_collection_t *employees;
mongoc_collection_t *events;
mongoc_read_concern_t *rc;
mongoc_write_concern_t *wc;
mongoc_transaction_opt_t *txn_opts;
bson_t opts = BSON_INITIALIZER;
bson_t *filter = NULL;
bson_t *update = NULL;
bson_t *event = NULL;
bool r;
bson_init (reply);
client = mongoc_client_session_get_client (cs);
employees = mongoc_client_get_collection (client, "hr", "employees");
events = mongoc_client_get_collection (client, "reporting", "events");
rc = mongoc_read_concern_new ();
mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_w (
wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_read_concern (txn_opts, rc);
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
r = mongoc_client_session_start_transaction (cs, txn_opts, error);
if (!r) {
goto done;
}
r = mongoc_client_session_append (cs, &opts, error);
if (!r) {
goto done;
}
filter = BCON_NEW ("employee", BCON_INT32 (3));
update = BCON_NEW ("$set", "{", "status", "Inactive", "}");
/* mongoc_collection_update_one will reinitialize reply */
bson_destroy (reply);
r = mongoc_collection_update_one (employees, filter, update, &opts, reply, error);
if (!r) {
goto abort;
}
event = BCON_NEW ("employee", BCON_INT32 (3));
BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}");
bson_destroy (reply);
r = mongoc_collection_insert_one (events, event, &opts, reply, error);
if (!r) {
goto abort;
}
r = commit_with_retry (cs, error);
abort:
if (!r) {
MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message);
mongoc_client_session_abort_transaction (cs, NULL);
}
done:
mongoc_collection_destroy (employees);
mongoc_collection_destroy (events);
mongoc_read_concern_destroy (rc);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
bson_destroy (&opts);
bson_destroy (filter);
bson_destroy (update);
bson_destroy (event);
return r;
}
void
example_func (mongoc_client_t *client)
{
mongoc_client_session_t *cs;
bson_error_t error;
bool r;
ASSERT (client);
cs = mongoc_client_start_session (client, NULL, &error);
if (!cs) {
MONGOC_ERROR ("Could not start session: %s", error.message);
return;
}
r = run_transaction_with_retry (update_employee_info, cs, &error);
if (!r) {
MONGOC_ERROR ("Could not update employee, permanent error: %s", error.message);
}
mongoc_client_session_destroy (cs);
}
using transaction_func = std::function<void(client_session & session)>;
auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
while (true) {
try {
txn_func(session); // performs transaction.
break;
} catch (const operation_exception& oe) {
std::cout << "Transaction aborted. Caught exception during transaction."
<< std::endl;
// If transient error, retry the whole transaction.
if (oe.has_error_label("TransientTransactionError")) {
std::cout << "TransientTransactionError, retrying transaction ..."
<< std::endl;
continue;
} else {
throw oe;
}
}
}
};
auto commit_with_retry = [](client_session& session) {
while (true) {
try {
session.commit_transaction(); // Uses write concern set at transaction start.
std::cout << "Transaction committed." << std::endl;
break;
} catch (const operation_exception& oe) {
// Can retry commit
if (oe.has_error_label("UnknownTransactionCommitResult")) {
std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
<< std::endl;
continue;
} else {
std::cout << "Error during commit ..." << std::endl;
throw oe;
}
}
}
};
// Updates two collections in a transaction
auto update_employee_info = [&](client_session& session) {
auto& client = session.client();
auto employees = client["hr"]["employees"];
auto events = client["reporting"]["events"];
options::transaction txn_opts;
read_concern rc;
rc.acknowledge_level(read_concern::level::k_snapshot);
txn_opts.read_concern(rc);
write_concern wc;
wc.acknowledge_level(write_concern::level::k_majority);
txn_opts.write_concern(wc);
session.start_transaction(txn_opts);
try {
employees.update_one(
make_document(kvp("employee", 3)),
make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
events.insert_one(make_document(
kvp("employee", 3),
kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
} catch (const operation_exception& oe) {
std::cout << "Caught exception during transaction, aborting." << std::endl;
session.abort_transaction();
throw oe;
}
commit_with_retry(session);
};
auto session = client.start_session();
try {
run_transaction_with_retry(update_employee_info, session);
} catch (const operation_exception& oe) {
// Do something with error.
throw oe;
}
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
while (true)
{
try
{
txnFunc(client, session); // performs transaction
break;
}
catch (MongoException exception)
{
// if transient error, retry the whole transaction
if (exception.HasErrorLabel("TransientTransactionError"))
{
Console.WriteLine("TransientTransactionError, retrying transaction.");
continue;
}
else
{
throw;
}
}
}
}
public void CommitWithRetry(IClientSessionHandle session)
{
while (true)
{
try
{
session.CommitTransaction();
Console.WriteLine("Transaction committed.");
break;
}
catch (MongoException exception)
{
// can retry commit
if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
{
Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");
continue;
}
else
{
Console.WriteLine($"Error during commit: {exception.Message}.");
throw;
}
}
}
}
// updates two collections in a transaction
public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
{
var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");
session.StartTransaction(new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority));
try
{
employeesCollection.UpdateOne(
session,
Builders<BsonDocument>.Filter.Eq("employee", 3),
Builders<BsonDocument>.Update.Set("status", "Inactive"));
eventsCollection.InsertOne(
session,
new BsonDocument
{
{ "employee", 3 },
{ "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
});
}
catch (Exception exception)
{
Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
session.AbortTransaction();
throw;
}
CommitWithRetry(session);
}
public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
{
// start a session
using (var session = client.StartSession())
{
try
{
RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
}
catch (Exception exception)
{
// do something with error
Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
}
}
}
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error {
for {
err := txnFn(ctx) // Performs transaction.
if err == nil {
return nil
}
log.Println("Transaction aborted. Caught exception during transaction.")
// If transient error, retry the whole transaction
if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
log.Println("TransientTransactionError, retrying transaction...")
continue
}
return err
}
}
commitWithRetry := func(ctx context.Context) error {
sess := mongo.SessionFromContext(ctx)
for {
err := sess.CommitTransaction(ctx)
switch e := err.(type) {
case nil:
log.Println("Transaction committed.")
return nil
case mongo.CommandError:
// Can retry commit
if e.HasErrorLabel("UnknownTransactionCommitResult") {
log.Println("UnknownTransactionCommitResult, retrying commit operation...")
continue
}
log.Println("Error during commit...")
return e
default:
log.Println("Error during commit...")
return e
}
}
}
// Updates two collections in a transaction.
updateEmployeeInfo := func(ctx context.Context) error {
employees := client.Database("hr").Collection("employees")
events := client.Database("reporting").Collection("events")
sess := mongo.SessionFromContext(ctx)
err := sess.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.Majority()),
)
if err != nil {
return err
}
_, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
_, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
return commitWithRetry(ctx)
}
txnOpts := options.Transaction().SetReadPreference(readpref.Primary())
return client.UseSessionWithOptions(
ctx, options.Session().SetDefaultTransactionOptions(txnOpts),
func(ctx context.Context) error {
return runTransactionWithRetry(ctx, updateEmployeeInfo)
},
)
}

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

void runTransactionWithRetry(Runnable transactional) {
while (true) {
try {
transactional.run();
break;
} catch (MongoException e) {
System.out.println("Transaction aborted. Caught exception during transaction.");
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
System.out.println("TransientTransactionError, aborting transaction and retrying ...");
continue;
} else {
throw e;
}
}
}
}
void commitWithRetry(ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
System.out.println("Transaction committed");
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
System.out.println("Exception during commit ...");
throw e;
}
}
}
}
void updateEmployeeInfo() {
MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = client.startSession()) {
clientSession.startTransaction(txnOptions);
employeesCollection.updateOne(clientSession,
Filters.eq("employee", 3),
Updates.set("status", "Inactive"));
eventsCollection.insertOne(clientSession,
new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));
commitWithRetry(clientSession);
}
}
void updateEmployeeInfoWithRetry() {
runTransactionWithRetry(this::updateEmployeeInfo);
}

注意

Motorについては、代わりにCallback APIを参照してください。

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

async function commitWithRetry(session) {
try {
await session.commitTransaction();
console.log('Transaction committed.');
} catch (error) {
if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
console.log('UnknownTransactionCommitResult, retrying commit operation ...');
await commitWithRetry(session);
} else {
console.log('Error during commit ...');
throw error;
}
}
}
async function runTransactionWithRetry(txnFunc, client, session) {
try {
await txnFunc(client, session);
} catch (error) {
console.log('Transaction aborted. Caught exception during transaction.');
// If transient error, retry the whole transaction
if (error.hasErrorLabel('TransientTransactionError')) {
console.log('TransientTransactionError, retrying transaction ...');
await runTransactionWithRetry(txnFunc, client, session);
} else {
throw error;
}
}
}
async function updateEmployeeInfo(client, session) {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
readPreference: 'primary'
});
const employeesCollection = client.db('hr').collection('employees');
const eventsCollection = client.db('reporting').collection('events');
await employeesCollection.updateOne(
{ employee: 3 },
{ $set: { status: 'Inactive' } },
{ session }
);
await eventsCollection.insertOne(
{
employee: 3,
status: { new: 'Inactive', old: 'Active' }
},
{ session }
);
try {
await commitWithRetry(session);
} catch (error) {
await session.abortTransaction();
throw error;
}
}
return client.withSession(session =>
runTransactionWithRetry(updateEmployeeInfo, client, session)
);

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
while (true) {
try {
$txnFunc($client, $session); // performs transaction
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
// If transient error, retry the whole transaction
if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
continue;
} else {
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
throw $error;
}
}
}
private function commitWithRetry3(\MongoDB\Driver\Session $session): void
{
while (true) {
try {
$session->commitTransaction();
echo "Transaction committed.\n";
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
continue;
} else {
echo "Error during commit ...\n";
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Error during commit ...\n";
throw $error;
}
}
}
private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
$session->startTransaction([
'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'),
'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY),
]);
try {
$client->hr->employees->updateOne(
['employee' => 3],
['$set' => ['status' => 'Inactive']],
['session' => $session],
);
$client->reporting->events->insertOne(
['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']],
['session' => $session],
);
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Caught exception during transaction, aborting.\n";
$session->abortTransaction();
throw $error;
}
$this->commitWithRetry3($session);
}
private function doUpdateEmployeeInfo(\MongoDB\Client $client): void
{
// Start a session.
$session = $client->startSession();
try {
$this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
} catch (\MongoDB\Driver\Exception\Exception) {
// Do something with error
}
}

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

def run_transaction_with_retry(txn_func, session):
while True:
try:
txn_func(session) # performs transaction
break
except (ConnectionFailure, OperationFailure) as exc:
# If transient error, retry the whole transaction
if exc.has_error_label("TransientTransactionError"):
print("TransientTransactionError, retrying transaction ...")
continue
else:
raise
def commit_with_retry(session):
while True:
try:
# Commit uses write concern set at transaction start.
session.commit_transaction()
print("Transaction committed.")
break
except (ConnectionFailure, OperationFailure) as exc:
# Can retry commit
if exc.has_error_label("UnknownTransactionCommitResult"):
print("UnknownTransactionCommitResult, retrying commit operation ...")
continue
else:
print("Error during commit ...")
raise
# Updates two collections in a transactions
def update_employee_info(session):
employees_coll = session.client.hr.employees
events_coll = session.client.reporting.events
with session.start_transaction(
read_concern=ReadConcern("snapshot"),
write_concern=WriteConcern(w="majority"),
read_preference=ReadPreference.PRIMARY,
):
employees_coll.update_one(
{"employee": 3}, {"$set": {"status": "Inactive"}}, session=session
)
events_coll.insert_one(
{"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session
)
commit_with_retry(session)
# Start a session.
with client.start_session() as session:
try:
run_transaction_with_retry(update_employee_info, session)
except Exception:
# Do something with error.
raise

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

def run_transaction_with_retry(session)
begin
yield session # performs transaction
rescue Mongo::Error => e
puts 'Transaction aborted. Caught exception during transaction.'
raise unless e.label?('TransientTransactionError')
puts "TransientTransactionError, retrying transaction ..."
retry
end
end
def commit_with_retry(session)
begin
session.commit_transaction
puts 'Transaction committed.'
rescue Mongo::Error => e
if e.label?('UnknownTransactionCommitResult')
puts "UnknownTransactionCommitResult, retrying commit operation ..."
retry
else
puts 'Error during commit ...'
raise
end
end
end
# updates two collections in a transaction
def update_employee_info(session)
employees_coll = session.client.use(:hr)[:employees]
events_coll = session.client.use(:reporting)[:events]
session.start_transaction(read_concern: { level: :snapshot },
write_concern: { w: :majority },
read: {mode: :primary})
employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
session: session)
events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
session: session)
commit_with_retry(session)
end
session = client.start_session
begin
run_transaction_with_retry(session) do
update_employee_info(session)
end
rescue StandardError => e
# Do something with error
raise
end

重要

読み取り操作と書込み (write) 操作をトランザクションに関連付けるには、トランザクション内の各操作にセッションを渡す必要があります。

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

MongoDB かリレーショナル データベースかにかかわらず、データベース システムに関係なく、トランザクションのコミット中にエラーを処理し、トランザクションの再試行ロジックを組み込む必要があります。

トランザクション内の個々の書き込み操作は、 retryWritesの値に関係なく、再試行できません。 If an operation encounters an error associated with the label "TransientTransactionError", such as when the primary steps down, the transaction as a whole can be retried.

  • Callback API には"TransientTransactionError"の再試行ロジックが組み込まれています。

  • Core transaction API には"TransientTransactionError"の再試行ロジックは組み込まれていません。 "TransientTransactionError"を処理するには、アプリケーションはエラーの再試行ロジックを明示的に組み込む必要があります。 一時的なエラーの再試行ロジックを組み込んだ例については、「 Core API の例 」を参照してください。

コミット操作は、再試行可能な書込み操作です。 コミット操作でエラーが発生した場合、MongoDB ドライバーはretryWritesの値に関係なくコミットを再試行します。

コミット操作で"UnknownTransactionCommitResult"というラベルの付いたエラーが発生した場合は、コミットを再試行できます。

  • Callback API には"UnknownTransactionCommitResult"の再試行ロジックが組み込まれています。

  • Core transaction API には"UnknownTransactionCommitResult"の再試行ロジックは組み込まれていません。 "UnknownTransactionCommitResult"を処理するには、アプリケーションはエラーの再試行ロジックを明示的に組み込む必要があります。 不明なコミット エラーの再試行ロジックを組み込んだ例については、「 Core API の例 」を参照してください。

バージョン 6.2 の新機能

MongoDB 6.2 以降、サーバーはTransactionTooLargeForCacheエラーを受信してもトランザクションを再試行しません。 このエラーは、キャッシュが小さすぎて再試行が失敗する可能性があることを意味します。

transactionTooLargeForCacheThresholdしきい値のデフォルト値は0.75です。 トランザクションがキャッシュの 75% 以上を使用している場合、サーバーはトランザクションを再試行する代わりにTransactionTooLargeForCacheを返します。

MongoDB の以前のバージョンでは、サーバーは ではなくTemporarilyUnavailable WriteConflictまたはTransactionTooLargeForCache を返します。

エラーしきい値を変更するには、 setParameterコマンドを使用します。

次のmongoshメソッドはトランザクションに利用できます。

注意

mongoshの例では、わかりやすくするために再試行ロジックと堅牢なエラー処理を省略しています。 アプリケーションにトランザクションを含めるより具体的な例については、代わりに「トランザクション エラーの処理」を参照してください。

// Create collections:
db.getSiblingDB("mydb1").foo.insertOne(
{abc: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
db.getSiblingDB("mydb2").bar.insertOne(
{xyz: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );
coll1 = session.getDatabase("mydb1").foo;
coll2 = session.getDatabase("mydb2").bar;
// Start a transaction
session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } );
// Operations inside the transaction
try {
coll1.insertOne( { abc: 1 } );
coll2.insertOne( { xyz: 999 } );
} catch (error) {
// Abort transaction on error
session.abortTransaction();
throw error;
}
// Commit the transaction using write concern set at transaction start
session.commitTransaction();
session.endSession();

戻る

トランザクション