typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
2.03k stars 519 forks source link

Different exception handling on the work stealing runtime compared to a fixed thread executor #1898

Open vasilmkd opened 3 years ago

vasilmkd commented 3 years ago

The work stealing runtime is safer when it comes to handling exceptions through running Runnables through EC#execute than when using a different Executor. Throwing exceptions on the worker threads of a fixed thread pool results in the death of those threads, and depending on the configuration of the ThreadFactory, those same threads may or may not be replaced by new ones. Furthermore, throwing fatal exceptions in a Runnable on any other thread pool will not currently result in a graceful shutdown of an IOApp, because we have bespoke logic to handle this as part of the IOFiber runloop. Currently, this safety comes at a performance disadvantage for the work stealing runtime by having to suspend every submitted runnable in an IO.delay. I think I have a way of equalizing the exception handling between the work stealing thread pool and any other thread pool used as a compute pool for IO and improving the performance of the work stealing runtime as a normal execution context for running Runnables, but this essentially comes at a performance cost of other ExecutionContexts used as a compute pool.

Here is a sample application with a fixed thread pool that shows this behavior:

package cats.effect
package example

import cats.effect.unsafe.IORuntime

object Example extends IOApp.Simple {

  override def runtime: IORuntime = {
    import java.util.concurrent.atomic.AtomicInteger
    import java.util.concurrent.Executors
    import cats.effect.unsafe.IORuntimeConfig
    import cats.effect.unsafe.Scheduler
    import scala.concurrent.ExecutionContext

    val (blocking, blockDown) = {
      val threadCount = new AtomicInteger(0)
      val executor = Executors.newCachedThreadPool { (r: Runnable) =>
        val t = new Thread(r)
        t.setName(s"io-blocking-${threadCount.getAndIncrement()}")
        t.setDaemon(true)
        t
      }
      (ExecutionContext.fromExecutor(executor), () => executor.shutdown())
    }

    val (scheduler, schedDown) = {
      val executor = Executors.newSingleThreadScheduledExecutor { r =>
        val t = new Thread(r)
        t.setName("io-scheduler")
        t.setDaemon(true)
        t.setPriority(Thread.MAX_PRIORITY)
        t
      }
      (Scheduler.fromScheduledExecutor(executor), () => executor.shutdown())
    }

    val (compute, compDown) = {
      val threadCount = new AtomicInteger(0)
      val executor = Executors.newFixedThreadPool(
        8,
        { r =>
          if (threadCount.get() >= 8) null
          else {
            val t = new Thread(r)
            t.setName(s"io-compute-${threadCount.getAndIncrement()}")
            t.setDaemon(true)
            t
          }
        })
      (ExecutionContext.fromExecutor(executor), () => executor.shutdown())
    }

    val cancelationCheckThreshold =
      System.getProperty("cats.effect.cancelation.check.threshold", "512").toInt

    new IORuntime(
      compute,
      blocking,
      scheduler,
      () => {
        compDown()
        blockDown()
        schedDown()
      },
      IORuntimeConfig(
        cancelationCheckThreshold,
        System
          .getProperty("cats.effect.auto.yield.threshold.multiplier", "2")
          .toInt * cancelationCheckThreshold
      )
    )
  }

  def run: IO[Unit] =
    IO.executionContext.flatMap { ec =>
      IO {
        for (_ <- 0 until 20) {
          ec.execute(() => throw new RuntimeException("Boom!"))
        }
      }
    } *> IO.never[Unit]
}

Executing this application results in the following situation in JDK Misssion Control where all io-compute threads are dead: Screen Shot 2021-04-11 at 13 34 45

vasilmkd commented 3 years ago

And here's the same app with the work stealing runtime:

package cats.effect
package example

object Example extends IOApp.Simple {

  def run: IO[Unit] =
    IO.executionContext.flatMap { ec =>
      IO {
        for (_ <- 0 until 20) {
          ec.execute(() => throw new RuntimeException("Boom!"))
        }
      }
    } *> IO.never[Unit]
}

And the Mission Control screenshot: Screen Shot 2021-04-11 at 13 44 27

vasilmkd commented 3 years ago

I guess the question I have is whether it's Cats Effect's responsibility to protect the user from potentially ruining the compute thread pool with random Runnables which may throw?

RafalSumislawski commented 3 years ago

It's hard to comment on the specifics of you idea on how to solve this problem, because I don't know how the idea. So I'll stay at an abstract level, and just say which of this aspects I see as important.

The WorkStealingThreadPool is currently getting a large performance handicap for trying to be more protective then other ExecutionContexts. I would love to see it show it's full performance potential.

I think the inconsistent behaviour on error is a big problem for someone swapping ExecutionContext implementations. As far as I understand, we can't really bend the other ExecutionContextimplementations to behave like WorkStealingThreadPool (we can wrap them in something but that doesn't really stop someone from submitting Runnables directly to it), but we can make WorkStealingThreadPool behave like other execution contexts at least in the sense that throwing exception in WorkStealingThreadPool would no longer shutdown the application.

Overall, my opinion is that it's cats-effect's job to protect us when we use pure cats-effect API. But when we use external, impure APIs like ExecutionContext's API, it's NOT cats-effect's job to protect us.

Nonetheless I would expect the WorkStealingThreadPool to not loose threads like newFixedThreadPool does ;).

djspiewak commented 3 years ago

@vasilmkd What's the current status on this?

vasilmkd commented 3 years ago

I believe the discrepancy I described in the original post still exists between our own workstealing pool and a third party one, as no real work has been done. If desirable, the solution isn't too hard. We need to wrap runnables posted on the executor in our own runnable type which has a try catch block.