arrow-kt / arrow

Λrrow - Functional companion to Kotlin's Standard Library
http://arrow-kt.io
Other
6.13k stars 442 forks source link

async sagas - exception in one saga cancels other async saga ? #3320

Closed T0X1CCAT closed 9 months ago

T0X1CCAT commented 9 months ago

Hi, just wondering for a clarification in the behaviour of sagas. I have found that if I have 2 async sagas in a saga scope then a thrown exception in one saga will prevent the other one from completing. Is that behaviour expected or is it due to how I am configuring the sagas. Is there a way to allow the other saga to complete its tasks ? Thanks Tom.

suspend fun one() {
    println("one on ${Thread.currentThread().name}")
    delay(500)
    println("one halfway done")
    delay(1000)
    println("one ending")
}

suspend fun two() {
    println("two on ${Thread.currentThread().name}")
    delay(1000)
    println("two ending")
    error("fail")
}

fun main() {
    runBlocking {
        val one = async(Dispatchers.Default) { one() }
        val two = async(Dispatchers.Default) { two() }

        saga {
            saga({
                one.await()
            }) {
                // rollback
                println("undo one")
            }

            saga({
                two.await()
            }) {
                // rollback
                println("undo two")
            }
        }.transact()
    }
}

gives

one on DefaultDispatcher-worker-1
two on DefaultDispatcher-worker-2
one halfway done
two ending
Exception in thread "main" java.lang.IllegalStateException: fail
    at SagaKt.two(saga.kt:20)
    at SagaKt$two$1.invokeSuspend(saga.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

Process finished with exit code 1

never see println("one ending")

nomisRev commented 9 months ago

Hey @T0X1CCAT,

I think this behavior is coming from runBlocking. I think the error in two cancels the parent CoroutineScope, which cancels one. So here two throws IllegalArgumentException, which runBlocking (parent CoroutineScope) detects and cancels one in turn.

So here it's not related to the saga. Without the parent cancel one, the output would be:

one on DefaultDispatcher-worker-1
two on DefaultDispatcher-worker-2
one halfway done
two ending
one ending
undo one

Where two.await() would have thrown an exception, and the saga would execute the compensation action of one.

Hope that helps! I'm closing this issue, but please open it again if something is not correct or feel free to as any follow-up questions!

T0X1CCAT commented 8 months ago

Thanks @nomisRev for those new to coroutines like me below might help


suspend fun one() {
    println("one on ${Thread.currentThread().name}")
    delay(500)
    println("one halfway done")
    delay(1000)
    println("one ending")
}

suspend fun two() {
    println("two on ${Thread.currentThread().name}")
    delay(1000)
    println("two ending")
    error("fail")
}

suspend fun doWork() {
    // A supervisor job, that allows children jobs to fail without causing this job to fail
    val supervisor = SupervisorJob()
    // A new coroutine scope inheriting the current coroutine context, but adding in the supervisor job.
    // So if any child coroutine fails, the parent and/or other sibling coroutines are not affected.
    // Note: child coroutines will still be cancelled if the parent is cancelled.
    val scope = CoroutineScope( coroutineContext + supervisor)

    with(scope) {
        // Run both one and two concurrently, on different threads
        val one = async(Dispatchers.Default) { one() }
        val two = async(Dispatchers.Default) { two() }

        saga {
            saga({
                one.await()
            }) {
                // rollback
                println("undo one")
            }

            saga({
                two.await()
            }) {
                // rollback
                println("undo two")
            }
        }.transact()
    }
}

fun main() = runBlocking {
    doWork()
}