Understanding transactions in Scala

I’m looking at the example given here (for which a snippet is given here). Some points of confusion:

  1. The session is never closed. Is this safe? What are the semantics for this?
  2. The updateOne and insertOne Observables doesn’t appear to be connected to the transaction commit Observable in any way (e.g., via a flatMap). So:
    2a. Does this mean that the observed results of those commands may not actually get applied (e.g., on rollback)?
    2b. What prevents the transaction from being committed before the commands have finished? Similarly, if I chain further commands off of them, how does the txn commit know to wait for them?

I’m sure I’m missing something fundamental, but I’m not sure what it is. Here’s a snippet of code showing what I’m trying to do:

object MongoHelper {
  def runTxn[A](client: MongoClient)(txn: ClientSession => A): Observable[Unit] = {
    client
      .startSession()
      .map { session =>
        session.startTransaction()
        txn(session)
        session
      }.flatMap { session =>
        session.commitTransaction().map { _ =>
          println("Committed transaction")
          session.close()
        }
      }
  }
}

...

  def txnTest = {
    val obs = MongoHelper.runTxn(client) { session =>
      println("Inserting")
      col.insertOne(session, Foo(5)).subscribe(res => println(s"Inserted: $res"))
    }
    Await.result(obs.head, 5.seconds)

I never get the “Inserted” message.

Thanks!

Or, even better:

  def runTxn[A](client: MongoClient)(txn: ClientSession => Observable[A]): Observable[A] = {
    for {
      session <- client.startSession()
      _ = session.startTransaction()
      res <- txn(session)
      _   <- session.commitTransaction()
      _ = session.close()
    } yield res
  }

Okay, so it works if I write it more intuitively, like this:

  def runTxn[A](client: MongoClient)(txn: ClientSession => Observable[A]): Observable[ClientSession] = {
    for {
      session <- client.startSession()
      _ = session.startTransaction()
      _ <- txn(session)
      _ <- session.commitTransaction()
    } yield {
      session.close()
      session
    }
  }

But the question remains: why is the example written the way it is? If you look at the code, you’ll see that it boils down to my code above:

  def runTxn[A](client: MongoClient)(txn: ClientSession => A): Observable[Unit] = {
    client
      .startSession()
      .map { session =>
        session.startTransaction()
        txn(session)
        session
      }.flatMap { session =>
        session.commitTransaction().map { _ =>
          println("Committed transaction")
          session.close()
        }
      }
  }

In particular:

  • The Observable(s) generated by the txn (which are an updateOne / insertOne in the example) are “thrown away” instead of being (flat)mapped over like in my example above, so that the transaction commit is disconnected from them.
  • The session is never closed.

I just don’t see how the provided example can possibly work. Can someone from the team please set me straight?

(It looks like the mods reordered my last two posts.)

Ugh, one last thing. I mean that in the provided example on your site (here), the session is never closed. I fixed that in my examples, but otherwise left your code as-is, to demonstrate the main problem (transaction disconnected from updates).