Docs Menu
Docs Home
/
MongoDB マニュアル

トランザクション

項目一覧

  • トランザクション API
  • トランザクションと不可分性
  • トランザクションと操作
  • トランザクションとセッション
  • 読み取り保証/書込み保証/読み込み設定
  • 一般情報
  • トランザクションのその他のトピック

MongoDB では単一ドキュメントに対する操作はアトミックなものとなります。単一ドキュメント構造内でのデータ間の関係をキャプチャするには、複数のドキュメントとコレクションにわたる正規化プロセスを経る代わりに埋め込み配列を利用できるため、単一ドキュメントのこうしたアトミック性によって多数の実用的なユースケースで分散トランザクションは不要になります。

複数のドキュメント (単一または複数のコレクション内) への読み取りと書き込みのアトミック性が必要な状況では、MongoDB は分散トランザクションをサポートします。分散トランザクションを使用すると、トランザクションを複数の操作、コレクション、データベース、ドキュメント、およびシャードにわたって使用できます。

このページの情報は、次の環境でホストされている配置に適用されます。

  • MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです

  • MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン

  • MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン


➤ 右上の言語の選択ドロップダウンメニューを使用して、以下の例の言語を設定します。


この例では、トランザクション API の主要なコンポーネントが強調表示されています。

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback API TransientTransactionErrorには、{0 または のいずれのコミットUnknownTransactionCommitResult エラーの再試行ロジックも組み込まれています。

重要

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、TransientTransactionError または UnknownTransactionCommitResult のコミット エラーに対する再試行ロジックも組み込まれています。

重要

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
read_concern rc_local{};
rc_local.acknowledge_level(read_concern::level::k_local);
read_preference rp_primary{};
rp_primary.mode(read_preference::read_mode::k_primary);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
opts.read_concern(rc_local);
opts.read_preference(rp_primary);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、TransientTransactionError または UnknownTransactionCommitResult のコミット エラーに対する再試行ロジックも組み込まれています。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 "TransientTransactionError"または"UnknownTransactionCommitResult"コミット エラーの再試行ロジックが組み込まれています。

重要

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、TransientTransactionError または UnknownTransactionCommitResult のコミット エラーに対する再試行ロジックも組み込まれています。

重要

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances.
For example:
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations.
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 "TransientTransactionError"または"UnknownTransactionCommitResult"コミット エラーの再試行ロジックが組み込まれています。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、TransientTransactionError または UnknownTransactionCommitResult のコミット エラーに対する再試行ロジックも組み込まれています。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

この例ではCore APIを使用しています。 Core API には"TransientTransactionError"または"UnknownTransactionCommitResult"のいずれのコミット エラーの再試行ロジックも組み込まれていないため、この例にはこれらのエラーに対してトランザクションを再試行するための明示的なロジックが含まれています。

重要

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、TransientTransactionError または UnknownTransactionCommitResult のコミット エラーに対する再試行ロジックも組み込まれています。

重要

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、TransientTransactionError または UnknownTransactionCommitResult のコミット エラーに対する再試行ロジックも組み込まれています。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

この例では、トランザクションを操作するために新しいCallback APIを使用します。これにより、トランザクションが開始され、指定された操作が実行され、コミット(エラーの場合は中止)されます。 新しいCallback APIには、 "TransientTransactionError"または"UnknownTransactionCommitResult"コミット エラーの再試行ロジックが組み込まれています。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?
// replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g.
// let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
let client = Client::with_uri_str(uri).await?;
// Prereq: Create collections. CRUD operations in transactions must be on existing collections.
client
.database("mydb1")
.collection::<Document>("foo")
.insert_one(doc! { "abc": 0})
.await?;
client
.database("mydb2")
.collection::<Document>("bar")
.insert_one(doc! { "xyz": 0})
.await?;
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transaction.
async fn callback(session: &mut ClientSession) -> Result<()> {
let collection_one = session
.client()
.database("mydb1")
.collection::<Document>("foo");
let collection_two = session
.client()
.database("mydb2")
.collection::<Document>("bar");
// Important: You must pass the session to the operations.
collection_one
.insert_one(doc! { "abc": 1 })
.session(&mut *session)
.await?;
collection_two
.insert_one(doc! { "xyz": 999 })
.session(session)
.await?;
Ok(())
}
// Step 2: Start a client session.
let mut session = client.start_session().await?;
// Step 3: Use and_run to start a transaction, execute the callback, and commit (or
// abort on error).
session
.start_transaction()
.and_run((), |session, _| callback(session).boxed())
.await?;

この例ではCore APIを使用しています。 Core API には"TransientTransactionError"または"UnknownTransactionCommitResult"のいずれのコミット エラーの再試行ロジックも組み込まれていないため、この例にはこれらのエラーに対してトランザクションを再試行するための明示的なロジックが含まれています。

重要

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

Tip

以下も参照してください。

MongoDB は(単一または複数のコレクション内の)複数のドキュメントへの読み取りと書込みにアトミック性を必要とする状況で、レプリカセットやシャーディングされたクラスターでのトランザクションを含む分散トランザクションをサポートします。

分散トランザクションは次のとおりアトミックな性質を持ちます。

  • トランザクションは、データ変更をすべて適用するか、変更をロールバックします。

  • トランザクションがコミットされると、トランザクション内で行われたすべてのデータ変更が保存され、トランザクション外でも表示されます。

    トランザクションがコミットされるまで、トランザクションで行われたデータ変更はトランザクションの外部には表示されません。

    ただし、トランザクションが複数のシャードに書き込む場合、すべての外部読み取り操作が、コミットされたトランザクションの結果がシャード全体で表示されるまで待機する必要はありません。たとえば、トランザクションがコミットされ、書込み 1 がシャード A で表示されているものの、書込み 2 がシャード B にまだ表示されていない場合、読み取り保証(read concern) "local" での外部読み取りは、書き込み 2 を見ることなく書き込み 1 の結果を読み取ることができます。

  • トランザクションが中止されると、トランザクションで行われたすべてのデータの変更は表示されることなく破棄されます。たとえば、トランザクションはいずれかの操作に失敗すると中止され、トランザクションで行われたすべてのデータ変更は、表示されることなく破棄されます。

重要

ほとんどの場合、分散トランザクションでは 1 つのドキュメントの書込み (write) よりもパフォーマンス コストが高くなります。分散トランザクションの可用性は、効果的なスキーマ設計の代わりにはなりません。多くのシナリオにおいて、非正規化されたデータモデル(埋め込みドキュメントと配列)が引き続きデータやユースケースに最適です。つまり、多くのシナリオにおいて、データを適切にモデリングすることで、分散トランザクションの必要性を最小限に抑えることができます。

トランザクションの使用に関するその他の考慮事項(ランタイム制限や oplog サイズ制限など)については、「本番環境での考慮事項」も参照してください。

Tip

以下も参照してください。

分散トランザクションは、複数の操作、コレクション、データベース、ドキュメント、シャードにわたって使用できます。

トランザクションについて:

  • トランザクション内でコレクションとインデックスを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。

  • トランザクションで使用されるコレクションは、異なるデータベースにある可能性があります。

    注意

    クロスシャードの書き込みトランザクションでは新しいコレクションを作成できません。たとえば、あるシャードで既存コレクションに書き込み、別のシャードで暗示的にコレクションを作成する場合、MongoDB では同じトランザクションで両方の操作を実行できません。

  • Capped コレクションには書き込めません。

  • Capped コレクションから読み取る場合、読み取り保証(read concern)"snapshot" は使用できません。(MongoDB 5.0 以降)

  • config データベース、 admin データベース、または local データベース内のコレクションの読み取りと書き込みはできません。

  • system.* コレクションに書込み (write) はできません。

  • explain または類似コマンドを使用して、サポートされている操作のクエリプランを返すことはできません。

  • トランザクションの外部で作成されたカーソルの場合、トランザクション内で getMore を呼び出せません。

  • トランザクション内で作成されたカーソルの場合、トランザクション外で getMore を呼び出せません。

トランザクションでサポートされない操作のリストについては、「制限付き操作」を参照してください。

Tip

トランザクションを開始する直前にコレクションを作成または削除する際に、トランザクション内のコレクションにアクセスする場合、書込み保証(write concern)付きで "majority" に作成または削除操作を発行して、トランザクションが必要なロックを取得できるようにします。

Tip

以下も参照してください。

トランザクションがクロスシャード書込みトランザクションでない場合、分散トランザクションで次の操作を実行できます。

  • コレクションを作成する。

  • 同じトランザクション内で以前に作成された新しい空のコレクションにインデックスを作成する

トランザクション内でコレクションを作成する際、以下を実行できます。

トランザクション内でインデックスを作成する場合 [1]、作成するインデックスは次のいずれかに配置される必要があります。

  • 存在しないコレクション。コレクションは、操作中に作成されます。

  • 同じトランザクション内で以前に作成された新しい空のコレクション。

[1] 既存インデックスに対して db.collection.createIndex()db.collection.createIndexes() を実行して、その有無を確認することもできます。これらの操作は、インデックスを作成せずに正常に返されます。
  • クロスシャードの書き込みトランザクションでは新しいコレクションを作成できません。たとえば、あるシャードで既存コレクションに書き込み、別のシャードで暗示的にコレクションを作成する場合、MongoDB では同じトランザクションで両方の操作を実行できません。

  • トランザクション内でコレクションまたはインデックスを明示的に作成する場合、トランザクションの読み取り保証(read concern)は "local" でなければなりません。

    コレクションとインデックスを明示的に作成するには、次のコマンドとメソッドを使用します。

    コマンド
    方式

Tip

以下も参照してください。

トランザクション内でカウント操作を実行するには、$count 集計ステージまたは $group$sum式を使用) 集計ステージを使用します。

MongoDB ドライバーは、$group$sum 式を併用してカウントを実行するヘルパー メソッドとして、コレクションレベルの API countDocuments(filter, options)を提供します。

mongosh には、$group$sum 式と併用してカウントを実行する db.collection.countDocuments() ヘルパー メソッドが用意されています。

トランザクション内で個別の操作を実行するには、以下を使用できます。

  • シャーディングされていないコレクションの場合、db.collection.distinct() メソッドまたは distinct コマンド、および 集計パイプラインを $group ステージと併用できます。

  • シャーディングされたコレクションの場合、db.collection.distinct() メソッド、または distinct コマンドは使用できません。

    シャーディングされたコレクションの個別の値を検索するには、代わりに $group ステージで集計パイプラインを使用します。以下に例を挙げます。

    • db.coll.distinct("x") の代わりに以下を使用します

      db.coll.aggregate([
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])
    • db.coll.distinct("x", { status: "A" }) の代わりに以下を使用します。

      db.coll.aggregate([
      { $match: { status: "A" } },
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])

    パイプラインはドキュメントにカーソルを返します。

    { "distinctValues" : [ 2, 3, 1 ] }

    カーソルを反復して、結果ドキュメントにアクセスします。

情報提供コマンドには、hellobuildInfoconnectionStatus(およびこれらのヘルパー メソッド)などがあり、トランザクションに含めることができますが、最初の操作になることはできません。

次の操作はトランザクションでは許可されません。

  • クロスシャードの書き込みトランザクションで新しいコレクションを作成します。たとえば、あるシャードで既存コレクションに書き込み、別のシャードで暗示的にコレクションを作成する場合、MongoDB では同じトランザクションで両方の操作を実行できません。

  • コレクションの明示的な作成db.createCollection()メソッドやインデックスの明示的な作成(db.collection.createIndexes() および db.collection.createIndex() メソッドなど)で、"local" 以外の読み取り保証(read concern)レベルを使用する場合

  • listCollections コマンドと listIndexes コマンド、およびそのヘルパー メソッド。

  • その他の CRUD 操作や情報操作以外の操作、例えば createUsergetParametercount などおよびその補助ツール。

  • トランザクションはセッションに関連付けられます。

  • 1 セッションで一度に開くことができるトランザクションは 1 つまでです。

  • ドライバーを使用する場合、トランザクション内の各操作はセッションに関連付けられる必要があります。詳細については、ドライバー固有のドキュメントを参照してください。

  • セッションが終了し、そのセッションにオープン トランザクションがある場合、そのトランザクションは中止されます。

トランザクション内の操作では、トランザクションレベルの読み込み設定(read preference)が使用されます。

ドライバーを使用すると、トランザクション開始時にトランザクションレベルの読み込み設定(read preference)を設定できます。

  • 読み込み設定(read preference)がトランザクションレベルで設定されていない場合、セッションレベルの読み込み設定がトランザクションで使用されます。

  • 読み込み設定(read preference)がトランザクションとセッションのいずれのレベルでも設定されていない場合、クライアントレベルの読み込み設定がトランザクションで使用されます。デフォルトでは、クライアントレベルの読み込み設定が primary となります。

読み取り操作を含む分散トランザクションでは、読み込み設定(read preference)primary を使用する必要があります。特定のトランザクション内のすべての操作は、同じノードにルーティングする必要があります。

トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)が使用されます。つまり、コレクションレベルとデータベースレベルで設定された読み取り保証は、トランザクション内では無視されます。

トランザクションの開始時に、トランザクションレベルの読み取り保証(read concern)を設定できます。

トランザクションは次の読み取り保証(read concern)レベルをサポートします。

  • 読み取り保証(read concern)"local" は、ノードから入手可能な最新データを返しますが、ロールバックも可能です。

  • レプリカセットでは、トランザクションが読み取り保証localを使用している場合でも、トランザクションが開かれた時点でのスナップショットから操作が読み取られる場合、より強力な読み取り分離が見られる可能性があります。

  • シャーディングされたクラスター上のトランザクションの場合、"local" 読み取り保証(read concern)では、シャード全体でデータが同じスナップショット ビューから取得されることを保証できません。スナップショットの分離が必要な場合は、"snapshot" 読み取り保証(read concern)を使用してください。

  • トランザクション内でコレクションとインデックスを作成できます。コレクションまたはインデックスを明示的に作成する場合、トランザクションで読み取り保証 "local" を使用する必要があります。コレクションを暗黙的に作成する場合は、トランザクションに任意の読み取り保証を使用できます。

  • トランザクションが「majority 」の書込み保証(write concern)でコミットされた場合、読み取り保証(read concern)"majority" はレプリカセット ノードの過半数によって確認済みであり、かつロールバックできないデータを返します。それ以外の場合、読み取り保証 "majority" では読み取り操作によって過半数のコミット済みデータが読み取られる保証はありません。

  • シャーディングされたクラスター上のトランザクションの場合、読み取り保証(read concern)"majority" では、シャード全体でデータが同じスナップショット ビューから取得されることを保証できません。スナップショットの分離が必要な場合は、読み取り保証 "snapshot" を使用します。

  • トランザクションが「majority 」の書込み保証(write concern)でコミットされる場合、読み取り保証(read concern) "snapshot" はコミットされた過半数のデータのスナップショットからのデータを返します。

  • トランザクションが「過半数」の書込み保証(write concern)をコミットに使用しない場合、"snapshot" 読み取り保証(read concern)は、読み取り操作によってコミットされた過半数のデータのスナップショットが使用されたことを保証しません

  • シャーディングされたクラスター上のトランザクションの場合、データの "snapshot" ビューはシャード間で同期されます

トランザクションは、トランザクションレベルの書込み保証(write concern)を使用して書込み操作をコミットします。トランザクション内の書込み操作は、明示的な書込み保証を指定せずに、デフォルトの書込み保証を使用して実行される必要があります。コミット時に、書込み操作はトランザクションレベルの書込み保証を使用します。

Tip

トランザクション内の個々の書込み操作に書込み保証(write concern)を明示的に設定しないでください。トランザクション内の個々の書込み操作に書込み保証を設定すると、エラーが返されます。

トランザクションの開始時に、トランザクションレベルの書込み保証(write concern)を設定できます。

  • トランザクションレベルの書込み保証(write concern)の設定が解除されている場合、トランザクションレベルの書込み保証はデフォルトでセッションレベルの書込み保証でコミットされます。

  • トランザクションレベルとセッションレベルのいずれでも書込み保証(write concern)の設定が解除されている場合、トランザクションレベルの書込み保証はデフォルトでクライアントレベルの書込み保証になります。

トランザクションは、以下を含むすべての書込み保証(write concern)の w 値をサポートします。

  • 書込み保証(write concern)w: 1 は、コミットがプライマリに適用された後に確認応答を返します。

    重要

    w: 1 でコミットする場合、トランザクションはフェイルオーバーが発生した場合にロールバック可能です。

  • w: 1 書込み保証(write concern)でコミットする場合、トランザクションレベルの "majority" 読み取り保証(read concern)では、トランザクション内の読み取り操作が過半数でコミットされたデータを読み取るという保証は ありません

  • w: 1 書込み保証(write concern)でコミットする場合、トランザクションレベルの "snapshot" 読み取り保証(read concern)では、トランザクション内の読み取り操作で過半数でコミットされたデータのスナップショットが使用されるという保証は ありません

  • w: "majority" の書込み保証(write concern)は、コミットが投票ノードの過半数に適用された後に確認応答を返します。

  • w: "majority" 書込み保証(write concern) でコミットする場合、トランザクションレベルの "majority" 読み取り保証(read concern)により、操作によって過半数でコミットされたデータが読み取られたことが保証されます。シャーディングされたクラスター上のトランザクションでは、コミットされた過半数のデータのこのビューはシャード間で同期されません。

  • w: "majority" 書込み保証(write concern)でコミットする場合、トランザクションレベルの "snapshot" 読み取り保証(read concern)により、操作によって過半数でコミットされたデータの同期されたスナップショットから読み取られたことが保証されます。

注意

トランザクションに指定された書込み保証(write concern)にかかわらず、シャーディングされたトランザクションに対するコミット操作の一部には、{w: "majority", j: true} 書込み保証を使用する部分が含まれます。

サーバー パラメーター coordinateCommitReturnImmediatelyAfterPersistingDecision は、トランザクションをコミットする決定がクライアントに返されるタイミングを制御します。

このパラメーターは MongDB 5.0 で導入されたもので、デフォルト値はtrueです。 MongoDB 5.0.10 ではデフォルト値がfalseに変更されています。

coordinateCommitReturnImmediatelyAfterPersistingDecisionfalse の場合、シャード トトランザクションの調整役は、すべてのノードがマルチドキュメントトランザクションのコミットを確認するまで待機してから、コミットの決定をクライアントに返します。

マルチドキュメントトランザクション に対して"majority" 書込み保証(write concern)を指定した場合に、 レプリカセット ノード の 計算された過半数 へのレプリケート トランザクションに失敗すると、そのトランザクションはレプリカセット ノードに対してすぐにロールバックされない可能性があります。レプリカセットは結果整合性が得られます。 トランザクションは常にすべてのレプリカセット メンバーに適用またはロールバックされます。

トランザクションに指定された書込み保証(write concern)にかかわらず、ドライバーは commitTransaction の再試行時に書込み保証として w: "majority" を適用します。

以下の各セクションでは、トランザクションに関する詳細な考慮事項を取りあげています。

本番環境でのトランザクションについては、「本番環境での考慮事項」を参照してください。シャーディングされたクラスターについては、「本番環境での考慮事項(シャーディングされたクラスター)」も参照してください。

レプリカセットにアービタがある場合、トランザクションを使用してシャードキーを変更することはできません。 アービタは、 マルチシャード トランザクションに必要なデータ操作に参加することはできません。

書込み操作が複数のシャードにまたがるトランザクションで、アービタを含むシャードを対象に読み取りまたは書込み操作が実行される場合、そのトランザクションはエラーとなり中止します。

シャーディングされたクラスターのうち writeConcernMajorityJournalDefaultfalse に設定されているもの(インメモリ ストレージエンジンを使用する投票ノードのあるシャードなど)ではトランザクションを実行できません。

注意

トランザクションに指定された書込み保証(write concern)にかかわらず、シャーディングされたトランザクションに対するコミット操作の一部には、{w: "majority", j: true} 書込み保証を使用する部分が含まれます。

トランザクションのステータスとメトリクスを取得するには、次のメソッドを使用します。

ソース
戻り値

トランザクション メトリクスを返します。

注意

MongoDB Atlas M 0 /M 2 /M 5クラスターでは一部のserverStatus応答フィールドは返されません。 詳細については、MongoDB Atlas ドキュメントの「制限されたコマンド」を参照してください。

$currentOp 集計パイプライン

次の値を返します。

次の値を返します。

mongod および mongos のログ メッセージ
低速(operationProfiling.slowOpThresholdMs のしきい値を超える)トランザクションに関する情報を TXN ログ コンポーネントに含めます。

トランザクションを使用するには、配置のすべてのノードの FeatureCompatibilityVersion が次のバージョン以上である必要があります。

配置
最小 featureCompatibilityVersion
レプリカセット
4.0
シャーディングされたクラスター
4.2

ノードの機能の互換性バージョンを確認するには、該当ノードに接続して次のコマンドを実行します。

db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )

詳細については、setFeatureCompatibilityVersion に関する参考ページを参照してください。

分散トランザクションは次のレプリカセットとシャーディングされたクラスターでサポートされます。

  • プライマリが WiredTiger ストレージエンジンを使用しており、かつ

  • セカンダリ ノードが、WiredTiger ストレージエンジンまたはインメモリ ストレージエンジンのいずれかを使用している。

戻る

update