twitter / finagle

A fault tolerant, protocol-agnostic RPC system
https://twitter.github.io/finagle
Apache License 2.0
8.79k stars 1.46k forks source link

Pass Finagle Future into FuturePool Threads #862

Open politrons opened 4 years ago

politrons commented 4 years ago

Basically I have the problem that Finagle by default use an ExecutionContext with same number of threads as cores I have in my machine. And with block operations that number is not enough for a good performance and I would like to use another ExecutionContext with more Threads.

Here what I tried so far

  private val executorService = Executors.newFixedThreadPool(100)
  private val pool: ExecutorServiceFuturePool = FuturePool(executorService)

  private def shiftThread[T](response: Future[T]): Future[T] = {
    val shifted = Promise.interrupts[T](response)
    response.respond { t =>
      log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")
      pool(shifted.update(t))
    }
  }

But the only thing that I achieve is to move the response from the request in the FuturePool what I would like to do is make the request in the FuturePool already

Here as you can see the request/response is done in the finagle/netty thread pool

FINAGLE RESPONSE Return(Response("HTTP/1.1 Status(200)")) IN THREAD finagle/netty4-1-10

andrievsky commented 4 years ago

Hi, if you would like Netty to map underling request processing to custom thread pool executor probably there's a way, however I'm not sure what you're hoping to achieve. Maybe this example with and without FuturePools could help:

Before

  val service = new Service[http.Request, http.Response] {
    def apply(req: http.Request): Future[http.Response] =

     doSomeHeavyLifting() // Blocks Netty thread

      Future.value(  
        http.Response(req.version, http.Status.Ok)
      )
  }
...

After

  val service = new Service[http.Request, http.Response] {
    def apply(req: http.Request): Future[http.Response] =
      FuturePools.unboundedPool().apply(

          doSomeHeavyLifting() // Blocks pool thread

          http.Response(req.version, http.Status.Ok)
      );
  }
...
politrons commented 4 years ago

Hi for the answer. Basically I have the problem that Finagle by default use an ExecutionContext with same number of threads as cores I have in my machine. And with block operations that number is not enough for a good performance and I would like to use another ExecutionContext with more Threads.

Here what I tried so far

  private val executorService = Executors.newFixedThreadPool(100)
  private val pool: ExecutorServiceFuturePool = FuturePool(executorService)

  private def shiftThread[T](response: Future[T]): Future[T] = {
    val shifted = Promise.interrupts[T](response)
    response.respond { t =>
      log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")
      pool(shifted.update(t))
    }
  }

But the only thing that I achieve is to move the response from the request in the FuturePool what I would like to do is make the request in the FuturePool already

Here as you can see the request/response is done in the finagle/netty thread pool

FINAGLE RESPONSE Return(Response("HTTP/1.1 Status(200)")) IN THREAD finagle/netty4-1-10

hamdiallam commented 4 years ago

Where is the blocking code being run? The same thread will run continuations for Twitter Futures & Promises

if you adjusted the following, I believe you'll see the desired affect. shifted.respond { t => log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")

With the sample you've posted response is apart of the finagle thread, the log statement occurring in a continuation on a promise assigned to a Finagle/Netty thread. You want to shift the execution (using the FuturePool), then run the expensive code on the Future returned by the FuturePool, in your sample being shifted

politrons commented 4 years ago

Hi @hamdiallam This code that I posted was extracted from one filter that is trying to do that. The change you suggest it does not work and I think is logical. The code is just transfering the Finagle thread content into the promise, but when that happens it's too late since the whole request/response has been done in the Finagle Netty thread.

Code here https://github.com/twitter/finagle/blob/develop/finagle-core/src/main/scala/com/twitter/finagle/filter/OffloadFilter.scala#L89

As I see this is a death end I will stop and I will just use the java param we can pass in the process to specify number of workers.

You can close this ticket unless someone else can tell any other workaround

@hamdiallam thanks for all the support mate, cheers

andrievsky commented 4 years ago

@politrons From my experience you should never block Netty threads if you're looking for high and predictable performance of your service and offload blocking code to another pool.

You could try 2 things, both described here - https://twitter.github.io/finagle/guide/ThreadingModel.html

Just increase number of threads

Or per entire application (JVM process), using command-line flags:

-com.twitter.finagle.offload.numWorkers=14 -com.twitter.finagle.netty4.numWorkers=10

Offload blocking/heavy code

Offloading can be done on per-method (endpoint) basis:


import com.twitter.util.{Future, FuturePool}

def offloadedPermutations(s: String, pool: FuturePool): Future[String] =
  pool(s.permutations.mkString("\n"))

As well as per entire client or server:

import com.twitter.util.FuturePool
import com.twitter.finagle.Http

val server: Http.Server = Http.server
  .withExecutionOffloaded(FuturePool.unboundedPool)

val client: Http.Client = Http.client
  .withExecutionOffloaded(FuturePool.unboundedPool)
politrons commented 4 years ago

We're using version 18.11.0 which not contains withExecutionOffloaded Like I said before since it's seems it's not possible extend the number of workers programatically I will use com.twitter.finagle.netty4.numWorkers=???

Thanks

hamdiallam commented 4 years ago

@politrons

Can you clarify how the suggestion I posted does not work? I tried it out right now and does. It's the same mechanism in the OffloadFilter

In this example you can consideer executorA to be the finagle/netty threads and executorB being the application worker threads

val executorA = Executors.newCachedThreadPool(new NamedPoolThreadFactory("GroupA", true))
val executorB = Executors.newCachedThreadPool(new NamedPoolThreadFactory("GroupB", true))
val poolA = FuturePool(executorA)
val poolB = FuturePool(executorB)

def shiftToB[T](response: Future[T]): Future[T] = {
  val shifted = Promise.interrupts[T](response)
  response.respond { t => poolB(shifted.update(t)) }
  shifted
}

shiftToB(poolA(1)).map { _ =>
 // blocking code needs to run **after** shifting as a continuation of `shifted`
 // which is updated by a thread in `executorB` as a result of `poolB(shifted.update(t)`.
 // Hence the assigned GroupB thread will also run all of the continuations on `shifted`.
 println(Thread.currentThread().getName)
}

outputs "GroupB-1"