トランザクションの実行
Overview
このガイドでは、 Scalaドライバーを使用して トランザクション を実行する方法を学習できます。トランザクションを使用すると、トランザクション全体がコミットされた場合にのみ、データを変更する一連の操作を実行できます。トランザクション内のいずれかの操作が成功しない場合、ドライバーはトランザクションを停止し、変更が反映される前にすべてのデータ変更を破棄します。この特徴は アトミック性と 呼ばれます。
MongoDBでは、トランザクションは論理 セッション 内で実行されます。セッションは 、順番に実行されるよう関連付けられた読み取り操作または書き込み操作のグループです。セッションにより、一連の操作に対する 因果整合性 が有効になり、 ACID準拠のトランザクション内で操作を実行できるようになります。これは、アトミック性、整合性、分離、 耐久性 の期待を満たすトランザクションです。 MongoDBは、トランザクション操作で予期せぬエラーが発生した場合でも、その操作に関わるデータの一貫性が保たれることを保証します。
Scalaドライバーを使用する場合、クライアントで startSession()
メソッドを呼び出すことで ClientSession
を起動できます。その後、セッション内でトランザクションを実行できるようになります。
警告
ClientSession
は、それを作成したMongoClient
で実行されている操作でのみ使用します。 ClientSession
と別のMongoClient
を使用すると、操作エラーが発生します。
メソッド
startSession()
メソッドを呼び出してセッションを開始した後、ClientSession
クラスのメソッドを使用してセッション状態を変更できます。次の表では、トランザクションを管理するために使用できる方法について説明します。
方式 | 説明 |
---|---|
| Starts a new transaction on this session. You cannot start a
transaction if there's already an active transaction running in
the session. You can set transaction options by passing a TransactionOptions
instance as a parameter. |
| Commits the active transaction for this session. This method returns an
error if there is no active transaction for the session, the
transaction was previously ended, or if there is a write conflict. |
| Ends the active transaction for this session. This method returns an
error if there is no active transaction for the session or if the
transaction was committed or ended. |
トランザクションの例
この例では、 sample_mflix
データベースのコレクション内のデータを変更する runTransaction()
メソッドを定義しています。このコードは、次のアクションを実行します。
movies
コレクションとusers
コレクションにアクセスするためのMongoCollection
インスタンスを作成しますトランザクションの読み取り保証(read concern)と書込み保証 (write concern)を指定します
トランザクションを開始する
movies
コレクションにドキュメントを挿入し、結果を出力しますusers
コレクション内のドキュメントを更新し、結果を出力します
def runTransaction( database: MongoDatabase, observable: SingleObservable[ClientSession] ): SingleObservable[ClientSession] = { observable.map(clientSession => { val moviesCollection = database.getCollection("movies") val usersCollection = database.getCollection("users") val transactionOptions = TransactionOptions .builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() // Starts the transaction with specified options clientSession.startTransaction(transactionOptions) // Inserts a document into the "movies" collection val insertObservable = moviesCollection.insertOne( clientSession, Document("name" -> "The Menu", "runtime" -> 107) ) val insertResult = Await.result(insertObservable.toFuture(), Duration(10, TimeUnit.SECONDS)) println(s"Insert completed: $insertResult") // Updates a document in the "users" collection val updateObservable = usersCollection.updateOne( clientSession, equal("name", "Amy Phillips"), set("name", "Amy Ryan") ) val updateResult = Await.result(updateObservable.toFuture(), Duration(10, TimeUnit.SECONDS)) println(s"Update completed: $updateResult") clientSession }) }
注意
トランザクション内では、操作は順番に実行される必要があります。上記のコードは、各書き込み操作の結果を待機して、操作が同時に実行されないようにします。
次に、次のコードを実行してトランザクションを実行します。 このコードは、次のアクションを完了します。
startSession()
メソッドを使用してクライアントからセッションを作成しますデータベースとセッションをパラメータとして渡して、前述の例で定義された
runTransaction()
メソッドを呼び出しますcommitTransaction()
メソッドを呼び出してトランザクションをコミットし、操作が完了するのを待機します
val client = MongoClient("<connection string>") val database = client.getDatabase("sample_mflix") val session = client.startSession(); val transactionObservable: SingleObservable[ClientSession] = runTransaction(database, session) val commitTransactionObservable: SingleObservable[Unit] = transactionObservable.flatMap(clientSession => clientSession.commitTransaction()) Await.result(commitTransactionObservable.toFuture(), Duration(10, TimeUnit.SECONDS))
Insert completed: AcknowledgedInsertOneResult{insertedId=BsonObjectId{value=...}} Update completed: AcknowledgedUpdateResult{matchedCount=1, modifiedCount=1, upsertedId=null}
詳細情報
このガイドで言及されている概念の詳細については、 MongoDB Serverマニュアルの次のページ を参照してください。
ACID コンプライアンスの詳細については、 MongoDBウェブサイトの「 データベース管理システムのACIDプロパティに関するガイド 」に関する記事を参照してください。
挿入操作の詳細については、ドキュメントの挿入ガイドを参照してください。
API ドキュメント
このガイドで言及されているメソッドとタイプの詳細については、次のAPIドキュメントを参照してください。