arrow-kt / arrow

Λrrow - Functional companion to Kotlin's Standard Library
http://arrow-kt.io
Other
6.13k stars 442 forks source link

Exception in first async saga results in 2nd saga compensation block not being called #3323

Closed T0X1CCAT closed 6 months ago

T0X1CCAT commented 8 months ago

Hi there, in the below example I have 2 async tasks configured in a saga scope. I find that when the first task throws an exception the compensation block of the 2nd task is not run. However when the second task throws an exception the compensation block of the first task is thrown. Is my expectation incorrect that in the first case the second task should have the compensation block executed (I expect "undo two" to be printed)? Thanks Tom.

import arrow.resilience.saga
import arrow.resilience.transact
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.coroutineContext

suspend fun one() {
    println("one on ${Thread.currentThread().name}")
    delay(500)
    println("one halfway done")
    delay(1000)
    error("fail")
    println("one ending")
}

suspend fun two() {
    println("two on ${Thread.currentThread().name}")
    delay(1000)
    println("two ending")

}

suspend fun doWork() {
    // A supervisor job, that allows children jobs to fail without causing this job to fail
    val supervisor = SupervisorJob()
    // A new coroutine scope inheriting the current coroutine context, but adding in the supervisor job.
    // So if any child coroutine fails, the parent and/or other sibling coroutines are not affected.
    // Note: child coroutines will still be cancelled if the parent is cancelled.
    val scope = CoroutineScope( coroutineContext + supervisor)

    with(scope) {
        // Run both one and two concurrently, on different threads
        val one = async(Dispatchers.IO) { one() }
        val two = async(Dispatchers.IO) { two() }

        saga {
            saga({
                one.await()
            }) {
                // rollback
                println("undo one")
            }

            saga({
                two.await()
            }) {
                // rollback
                println("undo two")
            }
        }.transact()
    }
}

fun main() = runBlocking {
    doWork()
}
serras commented 7 months ago

@nomisRev did you have the chance of looking at this one?

nomisRev commented 6 months ago

Hey @T0X1CCAT,

I think your understanding of Saga might be incorrect, but it might also be due to the complexity of introducing async 😅

So one and two are running in parallel, and one throws after two is finished. However, you're not running both Sagas in parallel. Every saga({ }) { } blocks registers its compensation, and then runs the action.

 val one = async(Dispatchers.IO) { one() }
val two = async(Dispatchers.IO) { two() }

saga {
  saga({ one.await() }) { println("undo one") }
  saga({ two.await() }) { println("undo two") }
}

In your snippet, it registers println("undo one") as a compensation and awaits the results of one. Which is error("fail"), so it fails and runs the compensations (in this case only "undo one").

Alternatively you could run both in parallel:

saga {
   val one = async(Dispatchers.IO) { saga({ one() }) { println("undo one") } }
   val two = async(Dispatchers.IO) { saga({ two() }) { println("undo one") } }
   one.await()
   two.await()
}

Alternatively you can use parZip from Arrow Fx:

saga {
   parZip(
     { saga({ one() }) { println("undo one") } },
     { saga({ two() }) { println("undo one") } }
   ) { one, two -> }
}

I hope this helps! Let me know if you have any other questions, or doubts.