typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
2.02k stars 517 forks source link

Contention on Map when using IO.unsafeToFuture() #2663

Open yanns opened 2 years ago

yanns commented 2 years ago

From https://discord.com/channels/632277896739946517/632278585700384799/921743315169325087

In a project based on Future, we introduce IO step by step. We use IO.unsafeToFuture() for interoperability. I can observe the following locking:

Screenshot 2021-12-18 at 13 35 46

The whole application is running with one main ExecutionContext using a ForkJoinPool building very similarly to scala.concurrent.ExecutionContext.opportunistic.

We build our own IORuntime to re-use the main ExecutionContext like this:

import cats.effect.unsafe.{IORuntimeConfig, Scheduler}

import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.concurrent.{blocking, ExecutionContext}

object IORuntimeFactory {

  /**
   * To be able to run some [[cats.effect.IO]], for example into a [[scala.concurrent.Future]],
   * we need a [[cats.effect.unsafe.IORuntime]]. This one is build from existing execution context.
   * It does not follow the best practices from cats effects as we are using the same
   * execution context for blocking and non blocking operations.
   *
   * This should be ok as long as we don't use it a lot.
   *
   * In general, it's better to avoid using it, and to consider using a [[cats.effect.std.Dispatcher]] instead.
   */
  def from(ec: ExecutionContext): cats.effect.unsafe.IORuntime = {
    val compute = ec
    val blockingEC = new ExecutionContext {
      override def execute(runnable: Runnable): Unit = ec.execute(() => blocking(runnable.run()))
      override def reportFailure(cause: Throwable): Unit = ec.reportFailure(cause)
    }
    cats.effect.unsafe.IORuntime(compute, blockingEC, scheduler, () => (), IORuntimeConfig())
  }

  /** same scheduler as in [[cats.effect.unsafe.IORuntime.global]] */
  val scheduler: Scheduler = {
    val threadPrefix: String = "io-scheduler"
    val scheduler = new ScheduledThreadPoolExecutor(
      1,
      { r =>
        val t = new Thread(r)
        t.setName(threadPrefix)
        t.setDaemon(true)
        t.setPriority(Thread.MAX_PRIORITY)
        t
      })
    scheduler.setRemoveOnCancelPolicy(true)
    Scheduler.fromScheduledExecutor(scheduler)
  }

}

This runtime can be instantiated many times. For the observed contention, it is instantiated once and re-used.

armanbilge commented 2 years ago

Thanks for the detailed write up! Just repeating a couple notes from Discord for anyone reading the issue here:

  1. The SynchronizedMap here is used for tracking fibers for the new "fiber dump" feature introduced in 3.3.0. So, the easiest way to remove this overhead is simply to disable diagnostics with -Dcats.effect.tracing.mode=off argument to JVM.
  2. Cats Effect's own WorkStealingThreadPool is far better optimized for these diagnostics. So if possible, using the WorkStealingThreadPool as the shared ExecutionContext for your application is better than using an external ExecutionContext for the IORuntime.
yanns commented 2 years ago
1. The `SynchronizedMap` here is used for tracking fibers for the new "fiber dump" feature introduced in 3.3.0. So, the easiest way to remove this overhead is simply to disable diagnostics with [`-Dcats.effect.tracing.mode=off`](https://typelevel.org/cats-effect/docs/tracing#configuration) argument to JVM.

This is such a good feature - difficult decision to disable it.

2. Cats Effect's own `WorkStealingThreadPool` is far better optimized for these diagnostics. So if possible, using the `WorkStealingThreadPool` as the shared `ExecutionContext` for your application is better than using an external `ExecutionContext` for the `IORuntime`.

OK I see. Our application is heavily based on Future and only a small portion on it is using IO to enable to progressive migration.

As a main execution context, mainly used to run Future, I don't see that WorkStealingThreadPool can work here.

scala.concurrent.impl.ExecutionContextImpl.DefaultThreadFactory is able to detect scala.concurrent.blocking code blocks to add new threads if necessary, a feature that we cannot live without. And we also have some metrics based on the ForkJoinPool, like getQueuedSubmissionCount:

val ThreadPoolMetrics: Map[MetricName, ForkJoinPool => Long] = {
    import ThreadPoolMetricsNames._
    Map(
      active -> (_.getActiveThreadCount),
      poolSize -> (_.getPoolSize),
      task -> (_.getActiveThreadCount),
      completedTask -> (_.getRunningThreadCount),
      queuedTasks -> (_.getQueuedSubmissionCount),
      stealCount -> (_.getStealCount)
    )
  }

I could not find any possibility to get some metrics from WorkStealingThreadPool.

At then end, either we keep this current execution context also for the IORuntime with the threads contention. Or we have to use two execution contexts: one for the Future part, and one for the IO part.

armanbilge commented 2 years ago

scala.concurrent.impl.ExecutionContextImpl.DefaultThreadFactory is able to detect scala.concurrent.blocking code blocks to add new threads if necessary, a feature that we cannot live without.

Good news, WorkStealingThreadPool supports this 👍

And we also have some metrics based on the ForkJoinPool, like getQueuedSubmissionCount:

WorkStealingThreadPool exposes similar metrics as MBeans, would that work for you? See the "Fiber Runtime Observability" in https://github.com/typelevel/cats-effect/releases/tag/v3.3.0

yanns commented 2 years ago

scala.concurrent.impl.ExecutionContextImpl.DefaultThreadFactory is able to detect scala.concurrent.blocking code blocks to add new threads if necessary, a feature that we cannot live without.

Good news, WorkStealingThreadPool supports this 👍

I see now that cats.effect.unsafe.WorkerThread extend scala.concurrent.BlockContext 👍

And we also have some metrics based on the ForkJoinPool, like getQueuedSubmissionCount:

WorkStealingThreadPool exposes similar metrics as MBeans, would that work for you? See the "Fiber Runtime Observability" in https://github.com/typelevel/cats-effect/releases/tag/v3.3.0

Oh yes, this can be a way to achieve that. I'll have a look. Thanks!