twitter / finagle

A fault tolerant, protocol-agnostic RPC system
https://twitter.github.io/finagle
Apache License 2.0
8.78k stars 1.45k forks source link

How to use the partitioning strategy with the http client? #953

Open gabrielgiussi opened 1 year ago

gabrielgiussi commented 1 year ago

Is your feature request related to a problem? Please describe. I want to perform load balancing based on the request to g et affinity with servers that have a hot cache for the data each request will access. Currently this can't be done in the Load Balancer layer because the pick method doesn't have access to the request.

Describe the solution you'd like I think having a partitioning aware client could be the solution, but I wonder why the service ConsistentHashPartitioningService is private, which makes it impossible to reuse with an Http Client. Even if that class wasn't private, I'm not sure how the wiring should be done, it seems I need to use the Stack API.

Describe alternatives you've considered Using a ServiceFactory to instantiate a service per partition (since it performs LB only once), but this makes using the client much harder since now I have to maintaining this mapping of key to service. Besides that, I don't have a way to achieve the affinity, aka telling that service instance which is the subset of nodes that should use to load balance.

xin301x commented 1 year ago

这是来自QQ邮箱的假期自动回复邮件。   您好,我最近正在休假中,无法亲自回复您的邮件。我将在假期结束后,尽快给您回复。

gabrielgiussi commented 1 year ago

I was able to use the ConsistentHashPartitioningService with the Http client and it is working as expected, the HashRingNodeManager returns one Service from the ring and this only sees one Host, so whatever load balancer strategy was configured it will always route to that node.

However, I want to also add a new behavior which is fallback to the configured load balancing strategy when the header that includes the partition key is not present. The ConsistentHashPartitioningService doesn't allow such fallback, since we can only return a failed feature when the partition key is not present, overriding noPartitionInformationHandler. The approach I've implemented is to return a Service wrapper that checks for the presence of the header, in which case it forwards to the partitioning service, otherwise it materializes a ServiceFactory from the stack and forwards to it. This last part is the one I don't understand if it is semantically correct and/or safe in terms of resource utilization, but it achieves the intended behavior.

Here is the module

import com.twitter.finagle
import com.twitter.finagle.Stack.{Module0, Params}
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.loadbalancer.LoadBalancerFactory
import com.twitter.finagle.partitioning.{ConsistentHashPartitioningService, PartitioningService}
import com.twitter.finagle.partitioning.param.{KeyHasher, NumReps}
import com.twitter.util.Future
import sun.nio.cs.UTF_8

object AffinityHttpClientModule {

  val role = Stack.Role("HttpPartitioning")

  val module = new Stack.Module[ServiceFactory[Request,Response]] {

    val parameters = Seq(
      implicitly[Stack.Param[LoadBalancerFactory.Dest]],
      implicitly[Stack.Param[finagle.param.Stats]]
    )

    def newConsistentHashPartitioningService(underlying: Stack[ServiceFactory[Request, Response]],
                                             params: Params
                                            ): ConsistentHashPartitioningService[Request, Response, String] = {
      val KeyHasher(hasher) = params[KeyHasher]
      val NumReps(numReps) = params[NumReps]

      new ConsistentHashPartitioningService[Request,Response,String](underlying,params,hasher,numReps) {

        override protected def getKeyBytes(key: String): Array[Byte] = key.getBytes(UTF_8.INSTANCE)

        override protected def getPartitionKeys(request: Request): Iterable[String] = {
          request.headerMap.get("request-key").toList
        }

        override protected def createPartitionRequestForKeys(original: Request, keys: Seq[String]): Request = {
          // TODO We are not creating multiple requests from one so...
          original
        }

        override protected def mergeResponses(originalReq: Request, results: PartitioningService.PartitionedResults[Request, Response]): Response = {
          // TODO I'm not doing N requests so this should be enough
          results.successes.toList match {
            case (_,res)::_ => res
            case _ => throw results.failures.toList.head._2
          }
        }

        override protected def noPartitionInformationHandler(req: Request): Future[Nothing] = {
          Future.exception(new Exception())
        }
      }
    }

    final override def make(
                             params: Params,
                             next: Stack[ServiceFactory[Request, Response]]
                           ): Stack[ServiceFactory[Request, Response]] = {
      val partitioningService: Service[Request, Response] = newConsistentHashPartitioningService(next, params)

      // TODO validate if it is ok to materialize the stack here
      val serviceFactory = next.make(params)

      Stack.leaf(role, ServiceFactory.const(new Service[Request,Response]{
        override def apply(request: Request): Future[Response] = {
          partitioningService(request)
          // if the request key is set choose the host based on the consistent hash of the key
          if (request.headerMap.contains("request-key"))
            partitioningService(request)
          else {
            // otherwise, load balance using the configured load balancer
            serviceFactory.apply().flatMap(_.apply(request))
          }

        }
      }))
    }

    override def role: Stack.Role = AffinityHttpClientModule.role

    override def description: String = ""
  }

}

And then I do a insertAfter in the http client stack

  def client: Http.Client = Client().withStack(_.insertAfter(
    BindingFactory.role,
    AffinityHttpClientModule.module
  ))