typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
2.01k stars 515 forks source link

IOApp hangs in Databricks 10.4LTS #3201

Open gatear opened 1 year ago

gatear commented 1 year ago

The issue a simple IOApp executes all required code but then hangs indefinitely on 3.3.14.

import cats.effect._

object Test extends IOApp {

  def run(args: List[String]):  IO[ExitCode] =
    IO.delay(ExitCode.Success)
}

The issue is somehow fixed on 3.4-4c07d83 where first time it runs successfully. The second run it hangs and I also get:

WARNING: Cats Effect global runtime already initialized; custom configurations will be ignored

For context on CE2 there were no such issues. Now I assume it could be related to the way the lifetime of Spark containers is managed in Databricks, the shutdown hooks are not run reliably.

armanbilge commented 1 year ago

It looks like it's not actually shutting down the JVM but instead re-invoking main. That's why you are getting the warning about the global runtime already being initialized: initializing the global runtime one of the first steps in main, and can only be initialized exactly once.

djspiewak commented 1 year ago

So I haven't used Databricks except for Spark jobs and notebooks… is this type of main runnable app usage common on the platform? In order to make this work, IOApp would need to detect Databricks specifically and behave differently. That's not without precedent (we detect sbt, for example), but we would need a way to test and maintain that.

gatear commented 1 year ago

@armanbilge initially on 3.3.14 it was hanging from the first main call and afterwards on 3.4-4c07d83 I was experimenting in Databricks notebooks. In a notebook the JVM will never stop after running a cell with an IOApp. Re-running a cell means re-invoking main here.

So by design an IOApp shouldn't be re-runnable in such a context?

@djspiewak I've used initially jar jobs (you need to specify the main class i.e. the IOApp) and afterwards explicit Test .main(..) calls from notebook cells. Notebook based jobs and jar jobs are 2 common ways of running scala code in Databricks.

I think auto-detecting Databricks would work only by looking at env vars or check the existence of a spark session. Need to look into it.

EDIT: this is probably the simplest way

  sys.env.contains("DATABRICKS_RUNTIME_VERSION")
armanbilge commented 1 year ago

I don't think IOApp is the right thing to use for notebooks. Instead you should use IO#unsafeRun* with IORuntime.global. A notebook is effectively a REPL.

@gatear it was not clear to me from your comment: do you have this issue when using jar jobs as well? That seems like a more serious issue in my opinion.

gatear commented 1 year ago

@armanbilge I see you're right a notebook is effectively a REPL however jar jobs can be configured with a job cluster (dedicated compute and JVM initialised for that particular job run) or an all-purpose one (which wouldn't necessarily shutdown after the job run and could actually try to run the jar job multiple times)

I will test if a jar job hangs the second time it runs on the same all-purpose cluster.

Regarding your question, I tried 2 versions of cats-effect:

djspiewak commented 1 year ago

all-purpose one (which wouldn't necessarily shutdown after the job run and could actually try to run the jar job multiple times)

This definitely seems like the one where IOApp would need to do some environment detection. Let's see though.

I see you're right a notebook is effectively a REPL

Just to confirm, I agree with @armanbilge here. In a notebook, you should just import cats.effect.unsafe.implicits._ and call unsafeRunSync() on your IOs. It's moderately ugly, but in a notebook context, IOApp isn't actually doing anything for you. You can't trigger fiber dumps, it's a persistent process so shutdown hooks are meaningless, the runtime setup doesn't matter because the JVM is persistent, etc etc etc. So you aren't gaining anything over unsafeRunSync() when you call main() other than loss of control over how the runtime is provisioned.

But a JAR job is a different question. :-) Let's see if we can make that better.

gatear commented 1 year ago

Ok so I can confirm the following on cats-effect 3.4-4c07d83

So I guess we can agree here the way to fix this is with specific platform support for Databricks? If yes I would like to look into it

djspiewak commented 1 year ago

I think it is indeed worth doing the platform specific Databricks detection and support for JAR jobs. Here's what I'm thinking:

  1. Let's do the thing you suggest with the environment variables. Help on this would be immensely appreciated! The ideal thing is if we can also set up some sort of automated test for it in CI, but I understand that's going to be dodgy
  2. We need a new documentation page mentioning this
  3. Said documentation page should also discuss notebooks and how IOApp is less suitable there and instead you should use an unsafeRun directly.
gatear commented 1 year ago

Ok I'll start a PR fixing this issue

wjoel commented 1 year ago

It might be worth checking Azure Synapse, Apache Zeppelin, and other Spark notebook environments to see if they run into similar problems. It might not be that specific to only Databricks.

gatear commented 1 year ago

I’ve started working on this issue; @wjoel I’ll focus on Databricks as I’m most familiar with this platform. I’ve noticed re-computing the runtime each time we call a main() will solve the problem.

So given:

object IOAppImpl extends IOApp {
  private def es() =
    Executors.newCachedThreadPool()

  private def scheduler(): (Scheduler, () => Unit) =
    createDefaultScheduler()

  override def runtime: IORuntime = {
    val (scheduler, cleanupScheduler) =
      this.scheduler()

    val service: ExecutorService =
      this.es()

    val context: ExecutionContextExecutorService =
      ExecutionContext.fromExecutorService(service)

    IORuntime(context, context, scheduler, () =>  (), super.runtimeConfig)
  }

  override def run(args: List[String]): IO[ExitCode] =
    ???
}

this works

IOAppImpl.main(args)
IOAppImpl.main(args)

If I change the provided runtime into a val and also pass a clean-up, at the second run I would get:

Task cats.effect.IOFiber RUNNING rejected from java.util.concurrent.ThreadPoolExecutor@7c0c77c7[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 8]

It makes sense, the first run cleans-up and terminates the pool.

With the default cats.effect.unsafe.WorkStealingThreadPool it looks like after the first IOApp run, the pool is shutdown and follow-up runs that try to use it end-up waiting indefinitely i.e. execution stops at:

        val result = blocking(queue.take())

@djspiewak @armanbilge Is this expected ? Maybe we could recommend for dynamic environments (like notebooks) to provide a runtime with no intrinsic clean-up?

armanbilge commented 1 year ago

Maybe we could recommend for dynamic environments (like notebooks) to provide a runtime with no intrinsic clean-up?

Yup, see recommendations for notebooks here: https://github.com/typelevel/cats-effect/issues/3201#issuecomment-1276342591.