JetBrains / Exposed

Kotlin SQL Framework
http://jetbrains.github.io/Exposed/
Apache License 2.0
8.37k stars 694 forks source link

Inner suspended transaction doesn't roll back when outer transaction throws #1586

Open pablo-silverback opened 2 years ago

pablo-silverback commented 2 years ago

In a context of suspended parent/child transactions, when there's an error after the child block ended the transaction doesn't roll back.

Pseudocode:

newSuspendedTransaction {
  insert(A)
  newSuspendedTransaction {
    insert(B)
  }
  throw Exception()
}

In this scenario, B is inserted on the database (transaction doesn't roll back).

Note that replacing newSuspendedTransaction with transaction (not suspended) works as expected.

Here's a complete working test that illustrates the issue:

```kotlin object ExampleTable : LongIdTable("transaction_example_table") { private val value = text("value") fun add(v: String) = ExampleTable.insert { it[value] = v } fun getAll(): List = selectAll().map { it[value] } } class TransactionTests { companion object { private lateinit var db: Database @JvmStatic @BeforeAll fun setup() { val ds = PGSimpleDataSource() ds.setUrl("jdbc:postgresql://localhost:5432/test_db?user=user&password=pass") db = Database.connect(ds as DataSource) transaction(db) { SchemaUtils.drop(ExampleTable) SchemaUtils.create(ExampleTable) } } } @BeforeEach fun deleteAll() { transaction(db) { ExampleTable.deleteAll() } } private fun assertNoRows() = Assertions.assertEquals(0, ExampleTable.selectAll().count()) @Test fun `inner new suspended transactions don't rollback when outer throws`(): Unit = runBlocking { db.useNestedTransactions = true // this doesn't seem to change anything try { newSuspendedTransaction(Dispatchers.Default, db) { assertNoRows() ExampleTable.add("outer") newSuspendedTransaction(Dispatchers.Default, db) { ExampleTable.add("inner") } throw Exception("outer transaction throws") } } catch (e: Exception) { transaction(db) { // this is the problem // assertNoRows() -> this would fail Assertions.assertEquals("inner", ExampleTable.getAll().single()) } } } } ```
thamidu commented 2 years ago

It's somewhere in the documentation.

The reason is when creating a suspendedTransaction, it starts a transaction as a separate new one. So it really doesn't do anything if you start a txn inside another txn. It always will be a separate one. So in this case, the child's txn will still be complete while the parent gets rollback.

This is a known issue in this framework. The only thing you can do is use threaded-based web frameworks like Servelets, and Spring MVC or wait till the Exposed RDBC implementation which is currently under development.

Edit: As @AlexeySoshin it's not an issue. Still, you can use JDBC with coroutines, but the downside is it blocks the thread till the SQL server returns the result. I did some digging and posted about that in the comments down below. You can have a look if you are interested. Sorry for the confusion.

AlexeySoshin commented 2 years ago

@pablo-silverback Thank you for providing a test case to reproduce.

Could you try the following code, please, and tell me if this is the behavior you were expecting?

newSuspendedTransaction(Dispatchers.Default, db) {
    assertNoRows()
    ExampleTable.add("outer")

    suspendedTransaction {
        ExampleTable.add("inner")
    }
    throw Exception("outer transaction throws")
}
pablo-silverback commented 2 years ago

@AlexeySoshin thanks! that fixes the test case. The problem is that our code is not that simple and it looks more like this:

newSuspendedTransaction(Dispatchers.Default, db) {
    assertNoRows()
    ExampleTable.add("outer")
    insertInner()
    throw Exception("outer transaction throws")
}

private fun insertInner() {
  suspendedTransaction { // -> obviously doesn't compile
    ExampleTable.add("inner")
  }
}

That's not an issue with transaction since we can reference the db object. What's the "exposed way" of running nested suspended transactions that may span multiple methods?

pablo-silverback commented 2 years ago

@AlexeySoshin Also, if it's a known issue like @thamidu points out, I'm happy to work on a fix for it (with some pointers of where to look). We rely heavily on exposed and would love to help improve it 🤗 (instead of work around it)

AlexeySoshin commented 2 years ago

@pablo-silverback

Thank you for the proposal, and contributions would be more than welcome, but I disagree with @thamidu that it's an issue at all.

The reason the above code doesn't compile is that:

  1. The function is not suspended, and suspendedTransactions requires a suspended context
  2. suspendedTransaction is declared an extension function on Transaction

The following code compiles and works as expected:

    private suspend fun Transaction.insertInner() {
        suspendedTransaction { // -> compiles
            ExampleTable.add("inner")
        }
    }    

Hope that helps.

lucas-avila-aside commented 2 years ago

@AlexeySoshin hi 👋 Yes, that's right. With an extension method would work but what do you think about the following scenario:

suspend fun A() {
   newSuspendedTransaction(Dispatchers.Default, db) {
      assertNoRows()
      ExampleTable.add("outer")
      insertInner()
  }
}

private suspend fun Transaction.insertInner() {
        suspendedTransaction {
            ExampleTable.add("inner")
        }
    } 

And then let's say you have somewhere, in another class/module/package/etc:

suspend fun B() {
   newSuspendedTransaction(Dispatchers.Default, db) {
      A()
      ... // more code
      throw Exception("outer outer transaction throws")
  }
}

In this case A() won't be rollbacked because it's creating a newSuspendedTransaction.

I think there are ways of structuring the code to try to work-around this but wouldn't it be nice for Exposed to notice that if the transaction is inside another one it has to treat it as the extension case?

pablo-silverback commented 2 years ago

@AlexeySoshin We have scenarios where insertInner is used as a standalone method (without a parent transaction). This is why we have nested newSuspendedTransactions. Exposed works seamlessly with non-supspended transaction calls for these scenarios and, as I understand it, there is no suspended equivalent.

My question then would be, what's the recommended way of dealing with this scenario (suspended tx calls that may or may not be nested)?

leoneparise commented 2 years ago

I had this scenario and created an utility function which to solve it:

suspend fun <T> inTransaction(
    transaction: Transaction? = null,
    context: CoroutineDispatcher = Dispatchers.IO,
    db: Database? = null,
    transactionIsolation: Int? = null,
    statement: suspend (Transaction) -> T
): T = transaction
    ?.suspendedTransaction(
        context = context,
        statement = { statement(this) }
    )
    ?: newSuspendedTransaction(
        context = context,
        db = db,
        transactionIsolation = transactionIsolation,
        statement = { statement(this) }
    )
pablo-silverback commented 2 years ago

@leoneparise thanks! using that definition of inTransaction this test case passes (when it should not)

try {
    inTransaction(context = Dispatchers.Default, db = db) {
        assertNoRows()
        ExampleTable.add("outer")

        inTransaction(context = Dispatchers.Default, db = db) {
            ExampleTable.add("inner")
        }
        throw Exception("outer transaction throws")
    }
} catch (e: Exception) {
    transaction(db) {
        Assertions.assertEquals("inner", ExampleTable.getAll().single())
    }
}

I assume the second call it's expected to be inTransaction(transaction = it, context = Dispatchers.Default, db = db) but then we're at square one again: the caller of inTransaction needs to know if it's running inside another transaction or not.

Note this is not required with the ordinary transaction call.

thamidu commented 2 years ago

I did some dig up on how those things works. So I recreated all of the above codes and compared them with the SQL query log of my MariaDB Server. Those are my findings.

Specs:

MariaDB - 10.2.11 Ktor - 2.0.1 Exposed - 0.38.2 mysql-connector-java - 8.0.29 HikariCP - 5.0.1

Hikari Connection Pooling with default properties and the additional properties below

cachePrepStmts = true
prepStmtCacheSize = 250
prepStmtCacheSqlLimit = 2048

Case 01 (Initial question)

Code -

suspend fun testNested(): Nothing {
    newSuspendedTransaction {
        Students.selectAll().count()

        newSuspendedTransaction {
            Students.update({Students.id eq 1}) {  st -> st[Students.status] = 0 }
        }

        throw Exception()
    }
}

MariaDB Log -

Id Command  Argument

12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
13 Query    SET autocommit=0
13 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
13 Query    UPDATE students SET status=0 WHERE students.id = 1
13 Query    commit
13 Query    SET autocommit=1
12 Query    rollback
12 Query    SET autocommit=1

Conclusion - As I mentioned in my previous comment, when you call (create) newSuspendedTransaction { } inside a parent txn, it will start as a separate txn. That's why the parent txn started in thread 12 and the child txn started in thread 13. That's why the child txn gets complete while the parent gets rollback.

Case 2 (@AlexeySoshin Solution)

Code -

suspend fun testNested(): Nothing {
    newSuspendedTransaction {
        Students.selectAll().count()

        suspendedTransaction {
            Students.update({Students.id eq 1}) {  st -> st[Students.status] = 0 }
        }

        throw Exception()
    }
}

MariaDB Log -

Id Command  Argument

12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
12 Query    UPDATE students SET status=0 WHERE students.id = 1
12 Query    rollback
12 Query    SET autocommit=1

Conclusion - This is the way. When you declare a suspendedTransaction { } inside a newSuspendedTransaction { }, it will takes the parent txn (scope's transaction) as it's txn. So basically the child txn's code will be merged into the parent's txn, which results a single txn. That's why there is not any other separate thread for the child txn. It's all in thread 12. So if there's any error at some point, the whole code will roll back.

Case 3 (@leoneparise solution)

This also output the same as Case 2, but with an exception. You need to pass the parent's txn to the child. as mentioned in

I assume the second call it's expected to be inTransaction(transaction = it, context = Dispatchers.Default, db = db) but then we're at square one again: the caller of inTransaction needs to know if it's running inside another transaction or not.

My implementation

I also have this same scenario just like yours. In my project, I also have some functions which calls directly and inside another function. So here is my implementation which also fixes the issue you raised and does the same function as Case 2 and 3

suspend fun <T> susTxn(
    db: Database? = null,
    statement: suspend (Transaction) -> T
): T = withContext(Dispatchers.IO){
    TransactionManager.currentOrNull()
        ?.let { it.suspendedTransaction { statement(this) } }
        ?: newSuspendedTransaction(db = db) { statement(this) }
}

In this case, you don't have to pass parent's transaction or whats or ever. You can use your functions as a standalone or inside another transaction. Please note that this implementation has not been tested. So please, use it with caution. I am not sure about how it will behave under many requests concurrently. But currently, we use this method in our development builds (not released to production yet) to fetch/persist data. No issue so far.

Test Case (For the solution above with parrarel requests)

Code -

// KTor Route
get("/test") {
    testNested()
    call.respond(HttpStatusCode.OK)
}

suspend fun testNested() = susTxn {
    Students.selectAll().count()

    innerTxn()

    throw Exception("Exception occurred")
    null
}

// Here, the innerTxn can be called separately too.
suspend fun innerTxn() = susTxn {
    Students.update({Students.id eq 1}) {  st -> st[Students.status] = 0 }
}
# Will send 10 concurrent requests

xargs -I % -P 10 curl -X GET -v http://localhost:8080/test < <(printf '%s\n' {1..10})

MariaDB Log -

Id Command  Argument
12 Query    SET autocommit=0
19 Query    SET autocommit=0
13 Query    SET autocommit=0
13 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
19 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
19 Query    SELECT COUNT(*) FROM students
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
13 Query    SELECT COUNT(*) FROM students
13 Query    UPDATE students SET status=0 WHERE students.id = 1
19 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    SET autocommit=0
15 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
15 Query    SELECT COUNT(*) FROM students
14 Query    SET autocommit=0
12 Query    UPDATE students SET status=0 WHERE students.id = 1
14 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
14 Query    SELECT COUNT(*) FROM students
13 Query    rollback
18 Query    SET autocommit=0
17 Query    SET autocommit=0
18 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
17 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
17 Query    SELECT COUNT(*) FROM students
15 Query    UPDATE students SET status=0 WHERE students.id = 1
19 Query    rollback
17 Query    UPDATE students SET status=0 WHERE students.id = 1
18 Query    SELECT COUNT(*) FROM students
14 Query    UPDATE students SET status=0 WHERE students.id = 1
18 Query    UPDATE students SET status=0 WHERE students.id = 1
12 Query    rollback
19 Query    SET autocommit=1
12 Query    SET autocommit=1
13 Query    SET autocommit=1
15 Query    rollback
14 Query    rollback
17 Query    rollback
18 Query    rollback
14 Query    SET autocommit=1
15 Query    SET autocommit=1
17 Query    SET autocommit=1
18 Query    SET autocommit=1
12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
16 Query    SET autocommit=0
16 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
16 Query    SELECT COUNT(*) FROM students
15 Query    SET autocommit=0
16 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    UPDATE students SET status=0 WHERE students.id = 1
16 Query    rollback
15 Query    SELECT COUNT(*) FROM students
12 Query    rollback
16 Query    SET autocommit=1
15 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    rollback
15 Query    SET autocommit=1
12 Query    SET autocommit=1

MariaDB Log (sorted by thread ID) -

12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
12 Query    UPDATE students SET status=0 WHERE students.id = 1
12 Query    rollback
12 Query    SET autocommit=1
12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
12 Query    UPDATE students SET status=0 WHERE students.id = 1
12 Query    rollback
12 Query    SET autocommit=1
13 Query    SET autocommit=0
13 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
13 Query    SELECT COUNT(*) FROM students
13 Query    UPDATE students SET status=0 WHERE students.id = 1
13 Query    rollback
13 Query    SET autocommit=1
14 Query    SET autocommit=0
14 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
14 Query    SELECT COUNT(*) FROM students
14 Query    UPDATE students SET status=0 WHERE students.id = 1
14 Query    rollback
14 Query    SET autocommit=1
15 Query    SET autocommit=0
15 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
15 Query    SELECT COUNT(*) FROM students
15 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    rollback
15 Query    SET autocommit=1
15 Query    SET autocommit=0
15 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
15 Query    SELECT COUNT(*) FROM students
15 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    rollback
15 Query    SET autocommit=1
16 Query    SET autocommit=0
16 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
16 Query    SELECT COUNT(*) FROM students
16 Query    UPDATE students SET status=0 WHERE students.id = 1
16 Query    rollback
16 Query    SET autocommit=1
17 Query    SET autocommit=0
17 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
17 Query    SELECT COUNT(*) FROM students
17 Query    UPDATE students SET status=0 WHERE students.id = 1
17 Query    rollback
17 Query    SET autocommit=1
18 Query    SET autocommit=0
18 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
18 Query    SELECT COUNT(*) FROM students
18 Query    UPDATE students SET status=0 WHERE students.id = 1
18 Query    rollback
18 Query    SET autocommit=1
19 Query    SET autocommit=0
19 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
19 Query    SELECT COUNT(*) FROM students
19 Query    UPDATE students SET status=0 WHERE students.id = 1
19 Query    rollback
19 Query    SET autocommit=1    

Conclusion - As you can see, all of those 10 requests got processed and got rolledbacked. But I'm little bit skeptical about some threads (aka DB Connections) executed multiple transactions. Im not sure, whether Hikari reused those connections once a transaction is processed / or queries got mixed up. But my bet is Hikari reused those connections.

Hope this helps you to get a basic understanding and solves your issue. Please let me know any corrections to be made.

AlexeySoshin commented 2 years ago

That's a great analysis, @thamidu, thanks for that!

pablo-silverback commented 2 years ago

@thamidu thanks for the deep analysis, unfortunately that approach relies on ThreadLocal, which is not AFAIK safe between kotlin coroutines.

For something like this to be safe I believe exposed should take into consideration something like the context pattern (also used in Go's goroutines).

//cc @AlexeySoshin

ov7a commented 2 years ago

Hi! Could anyone clarify, why nested suspending transactions code (as recommended by @AlexeySoshin)

  newSuspendedTransaction {
      exec("SELECT 1")
      suspendedTransaction {
          exec("SELECT 2")
      }
  }

has different behavior in comparison with regular transactions?

  transaction {
      exec("SELECT 1")
      transaction {
          exec("SELECT 2")
      }
  }

With suspended transactions only one transaction is actually created and reused for both statements. But with regular transactions two transactions are created, one having another as outerTransaction.

AlexeySoshin commented 2 years ago

@pablo-silverback Kotlin coroutines support context.

You can see some examples there: https://github.com/JetBrains/Exposed/issues/1565

pabloogc commented 1 year ago

I wanted to have a similar behavior using regular transaction blocks and suspending ones, so code like this

suspendedTransaction {
    suspendedTransaction {
       insert(1)
    }
    suspendedTransaction {
       insert(2)
    }
    throw Error("Both should roll back")
}

rolls back both insert(1) and insert(2). Using simply newSuspendedTransaction will not roll back either as of version 0.42.1 but will work properly with regular transaction { ... }.

My actual use code are composition of use cases so if a later one fails it also rolls back the changes of previous ones.

This is the utility function I am using using the public APIs, it seems to work well with the clear limitation that there is still only one transaction at the root, so changing the DB or the isolation level is not possible currently.

import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.exposedLogger
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync
import org.jetbrains.exposed.sql.transactions.experimental.withSuspendTransaction
import kotlin.coroutines.CoroutineContext

suspend fun <T> suspendedTransaction(
    context: CoroutineContext? = null,
    db: Database? = null,
    transactionIsolation: Int? = null,
    statement: suspend Transaction.() -> T
) {
    val existing = TransactionManager.currentOrNull()
    if (existing == null) {
        // Create a new transaction, it will propagate the actual tx reference
        // using coroutine ThreadContextElements into the TransactionManager if this method is nested
        suspendedTransactionAsync(
            context = context,
            db = db,
            transactionIsolation = transactionIsolation,
            statement = statement
        ).await()
    } else {
        if (transactionIsolation != null || db != null) {
            exposedLogger.warn(
                "transactionIsolation=${transactionIsolation}, db=${db} " +
                        "was provided but will " +
                        "be ignored since this transaction was propagated."
            )
        }
        existing.withSuspendTransaction(context = context, statement = statement)
    }
}
mariusingjer commented 6 months ago

@pablo-silverback regarding this statement:

@thamidu thanks for the deep analysis, unfortunately that approach relies on ThreadLocal, which is not AFAIK safe between kotlin coroutines.

I am not sure you are entirely correct, "TransactionManager.currentOrNull()" relies on thread local, but Expose synchronizes the ThreadLocal state (take a look at https://github.com/JetBrains/Exposed/blob/123bba939fda7561efe0c8ead2f75787e088eaee/exposed-core/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt#L36). So I think the last solution posted by @thamidu will work.