typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
1.99k stars 511 forks source link

unsafeToFuture fails to raise a second InterruptedException if the exception is handled and retried #4055

Open swapdonkey opened 2 months ago

swapdonkey commented 2 months ago

The code below catches a ExecutionException which wraps the Interrupted exception on the first iteration. On the second iteration no ExecutionException is thrown and the main thread blocks on the Await.

 while(true){
   val f = IO{
        throw new InterruptedException("Test") 
     }.attempt.unsafeToFuture()(IORuntime.global)

   try{
     Await.result(f, Duration.Inf)
   } catch {
      case t:ExecutionException => println(s"Underlyng exception ${t.getCause}")
    }
}

I think the issue is in IOFiber.onFatalFailure specifically the line if (IORuntime.globalFatalFailureHandled.compareAndSet(false, true)) On the first iteration globalFatalFailureHandled is initialised to false on the second iteration its now true so the code block is never executed and the exception isn't raised.

If that is the case would adding a function like resetFatalFailureHandled be added to IORuntime to reset it if the future returned from unsafeToFuture has its ExecutionException handled and retried in a loop?

durban commented 2 months ago

It seems there are multiple things going on here. The essential thing is that when a fatal exception is detected (which InterruptedException is), we're shutting down the whole world (meaning all the IORuntimes in the current JVM).

There are incorrect things I see in the behavior of CE with the code above:

These should be fixed, but fixing these won't make the code example "work", at least I don't think so: the idea is, that if there is a fatal exception, the most we can hope for is a clean shutdown.

What would you consider "correct behavior" for this code example?

djspiewak commented 2 months ago

So the tricky thing here is that InterruptedException, like all fatal errors, fully torpedoes the runtime. By the time you get to the second iteration, the threads are all shut down and gone, so it's impossible to use global to execute anything. The design expectation is that you're not going to try to execute anything after this point, and you'll basically just shut down the VM.

Of course, that's not what you want here since you're trying to run new things after we torpedo the runtime. The answer is probably for you to not use global and instead make your own IORuntime (probably using the builder). Whenever you catch and recover from a fatal error, build a new runtime for yourself and discard the old one.

armanbilge commented 2 months ago

So the tricky thing here is that InterruptedException, like all fatal errors, fully torpedoes the runtime. By the time you get to the second iteration, the threads are all shut down and gone, so it's impossible to use global to execute anything

Hmm, but maybe we can fix this. We already made some changes so that when the global runtime shuts down, it removes itself so that a new one may be installed.

Not entirely sure why that's not working in this instance.

durban commented 2 months ago

Not entirely sure why that's not working in this instance.

It's working. The second iteration of the while loop executes the IO on a different IORuntime. But then the fatal failure handling code doesn't entirely run, because globalFatalFailureHandled is already true.

swapdonkey commented 2 months ago

As per @durban comment creating the runtime on every iteration still causes the second iteration to not throw an exception outside of the IO.

while(true){
  val (compute,_) = IORuntime.createWorkStealingComputeThreadPool()
val (blocking,_) = IORuntime.createDefaultBlockingExecutionContext()
val (scheduler,_) = IORuntime.createDefaultScheduler()
val ioRuntime = IORuntime.builder().set compute(compute,  ()=>()).setBlocking(blocking, ()=>()).setScheduler(scheduler, ()=>()).build

  val f = IO{
       throw new InterruptedException("Test") 
    }.attempt.unsafeToFuture()(ioRuntime)

  try{
    Await.result(f, Duration.Inf)
  } catch {
     case t:ExecutionException => println(s"Underlyng exception ${t.getCause}")
   }
}

To give some context to the implementation we are migrating from cats effect 2 to 3. Our current implementation runs the IO in separate threads which are managed via JMX so on an interrupted exception from the IO is caught in the thread, the interrupt reset and then continues. Though not an ideal mechanism I was hoping with cats effect 3 we would just be able to keep the same handling to minimise the changes. So a separate IORuntime for each thread. When interrupt is thrown it's caught in the main thread, resets interrupt, recreates IORuntime and continues.

durban commented 2 months ago

There are incorrect things I see in the behavior of CE with the code above:

  • The IORuntime doesn't seem to shut down cleanly.

This seems to be a bug, see #4066 and #4067.