typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
2k stars 513 forks source link

Unsafe run sync in CE3 #2980

Open flipp5b opened 2 years ago

flipp5b commented 2 years ago

I'm trying to migrate my library (FP-friendly wrapper for hbase-client) from CE2 to CE3. Some part of the library is built on top of an impure callback-based Java API. This API has a restriction: actions inside the callbacks should be executed synchronously on a caller thread. Here you can see an interface I'm trying to implement: AdvancedScanResultConsumer. Note the Javadoc of ScanController: "... you should only call the suspend() or terminate() inside onNext or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places."

In CE2, I used f.toIO.unsafeRunSync() for this purpose. As far as I can see, it runs all the actions on the current thread, and everything works fine.

In CE3, I tried to solve it using Dispatcher.unsafeRunSync and IO.evalOn. But as far as I get it, dispatcher.unsafeRunSync(f.evalOn(ec)) runs evaluation on the default compute pool while blocking a caller thread and then shifts execution to ec. But the caller thread is the thread on which I'd like to execute actions described by f.

There's a hacky solution using both Dispatcher.unsafeRunSync and IO.syncStep. If I reorder actions inside the callback placing synchronous actions (which must be executed on the caller thread) before the asynchronous one, then the following code works:

import cats.effect.IO
import cats.effect.std.{Dispatcher, Queue}

import org.apache.hadoop.hbase.client.{AdvancedScanResultConsumer, Result}

class ScanResultConsumerImpl(dispatcher: Dispatcher[IO], queue: Queue[IO, Array[Result]]) extends AdvancedScanResultConsumer {
  override def onNext(results: Array[Result], controller: ScanController): Unit = {
    unsafeRun(
      for {
        // controller.suspend() must be called from the exact same thread as the onNext itself
        _ <- IO.delay(controller.suspend())
        // Other synchronous steps...
        _ <- queue.offer(value)
      } yield ()
    )

  private def unsafeRun[A](f: IO[A]): Unit =
    f.syncStep.unsafeRunSync() match {
      case Right(result) => result
      case Left(asyncF)  => dispatcher.unsafeRunSync(asyncF)
    }

  // The remainder is skipped for brevity
}

It would be great to have a built-in mechanism to handle such use cases in CE3.

armanbilge commented 2 years ago

@flipp5b and I discussed this a bit on Discord. What's interesting is that the unsafeRun method in the snippet above is essentially what I implemented for JS in https://github.com/typelevel/cats-effect/pull/2846. So the question is whether we should make that available for JVM as well.

djspiewak commented 2 years ago

So syncStep and SyncIO definitely feel like the right tools for resolving this. Just out of curiosity, is there a reason you can't be more explicit about this, by expressing the controller.suspend() in a SyncIO rather than in an IO?

flipp5b commented 2 years ago

@djspiewak, I agree that SyncIO would probably be a better choice here. The snippet above is a simplified version. In the full version of the code, the suspension part also includes some Ref-based machinery. For now, it's of type Ref[IO, T] so the controller.suspend() is also expressed in IO. But I suppose I can use Ref[SyncIO, T] instead of Ref[IO, T] and rewrite the onNext method as follows:

  override def onNext(results: Array[Result], controller: ScanController): Unit = {
    val suspendIfNeeded: SyncIO[Unit] = ??? // Express suspension based on Ref's and SyncIO.delay(controller.suspend())
    suspendIfNeeded.unsafeRunSync()
    dispatcher.unsafeRunSync(queue.offer(results))
  }
kamilkloch commented 1 year ago

We are facing the very same issue. The code below hacks IORuntime into the current caller thread:

  implicit val runtime: IORuntime = IORuntimeBuilder()
    .setCompute(ExecutionContext.fromExecutor(_.run()), () => ())
    .build()
armanbilge commented 1 year ago

Note that there is now IO#syncStep, so there is no need to hack. But it is helpful to know why you need this specific behavior.

kamilkloch commented 1 year ago

I steal the reasoning worded by @flipp5b on Discord:

armanbilge commented 1 year ago

I spoke with @kamilkloch on Discord. In their case, they needed to run certain IOs on a specific thread in order to interact with a library. Fortunately they have an ExecutionContext. The ideal thing to do in this instance is use .evalOn(...): this is exactly the usecase it was designed for.

While changing the IORuntime in this manner is possible (and not really a hack) I would highly recommend against it. The reason is because this special thread is a very precious resource, since you have to be on it, to interact with the library. That means you don't want to clog it up with unrelated tasks, since that will delay the execution of tasks that need it.

Instead you want to specifically .evalOn(...) the IOs that need to be on that thread. The rest of your IOs can run on the normal compute pool and take full advantage of a multicore system. This will ultimately have higher throughput.

kamilkloch commented 1 year ago

The case of https://github.com/real-logic/aeron/tree/master/aeron-cluster looks a bit more subtle: determinism is paramount, all of BL must be run on a single aeron cluster worker thread, interaction with the external world (e.g. receiving a response from the external service) must be chanelled through the cluster ingress. Not much headroom to leverage multiple cores. In practice, one probably ends up manually wrapping every single IO with evalOn.

armanbilge commented 1 year ago

@kamilkloch the linked documentation does not mention the word "thread" on that page ... would you mind pointing me to where exactly their requirements are described?

armanbilge commented 1 year ago

The Aeron "Best Practices Guide" guide says:

Aeron applications have most of the threading requirements controlled by the application. However, there is a per Aeron instance background thread, called the ClientConductor, that handles housekeeping and interacting with the Media Driver commands

https://github.com/real-logic/aeron/wiki/Best-Practices-Guide#application-threads

That sounds straightforward. Does clustering have specific requirements, described elsewhere?

kamilkloch commented 1 year ago

@armanbilge https://aeroncookbook.com/agrona/threads-agents-duty-cycles/

Agents can deal with being executed like this without change because all they do is execute a duty cycle. They do some work, and then optionally wait. And they repeat this task forever until stopped.

As a result, correctly constructed Agents generally do not break these guidelines:

  • never make use of internal threads. Don't fire up an executor. If absolutely required, consider adding tasks to a List and processing them on the current thread as a part of the duty cycle;
  • never expose public methods that are called across Agents, or Threads;
  • never make use of shared datastructures that are not thread-safe across agents. Concurrent collections, such as Agrona's RingBuffers and IPC messaging systems such as Aeron are well suited for cross agent communication

Aeron cluster: https://aeroncookbook.com/aeron-cluster/basic-sample/

armanbilge commented 1 year ago

@kamilkloch thanks for the references and context, that was very helpful.

From what I read, my understanding is that Agrona Agents are a very different model for concurrency and multithreading than the Cats Effect fiber-based model. Cats Effect is designed for building highly-parallel applications with shared concurrent state. Meanwhile, Agrona Agents seems to encourage a message-passing style architecture.

Of course with effort you can mix Cats Effect and Agrona Agents, but my guess is the sum will be less than the parts.

For example, if you are using Agents for performance, then the overhead of IO allocations and synchronization in the fiber runloop will eat away at that. Meanwhile, on a single thread you would not be taking advantage of the dynamic load-balancing capabilities of the Cats Effect work-stealing threadpool.

If you are using Aeron for its clustering capabilities, one strategy may be to use the Cats Effect runtime for most of your application, and use .evalOn(...) or unsafe interop to carefully interface with the cluster client, as a component of your application.