etaty / rediscala

Non-blocking, Reactive Redis driver for Scala (with Sentinel support)
Apache License 2.0
790 stars 143 forks source link

How to read from replicated slaves of a Redis cluster of shards #163

Open evbo opened 7 years ago

evbo commented 7 years ago

What is the protocol for telling rediscala to read from slaves and not master for an LRANGE operation?

Starting with Redis-cli, this can be accomplished on connecting to the slave and running: READONLY. Then, subsequent LRANGE commands will read directly from the slave for that client connection.

With Rediscala, I noticed that lrange() calls Lrange() which seems to have a boolean attribute that indicates it is not exclusively operating on master nodes:

case class Lrange[K, R](key: K, start: Long, stop: Long)(implicit redisKey: ByteStringSerializer[K], deserializerR: ByteStringDeserializer[R]) extends SimpleClusterKey[K] with RedisCommandMultiBulk[Seq[R]] {
  **val isMasterOnly = false**
  val encodedRequest: ByteString = encode("LRANGE", Seq(keyAsString, ByteString(start.toString), ByteString(stop.toString)))

  def decodeReply(mb: MultiBulk) = MultiBulkConverter.toSeqByteString(mb)
}

However, when restricting my RedisServer list to only slave nodes, I cannot get a connection to any of the slaves upon performing the lrange():

val redisIps = sys.env.getOrElse("REDIS_IPS", "localhost").split(",")
  val redisPort = 6379

  val cluster = RedisCluster(redisIps.map(host => RedisServer(host, redisPort)))

  val setReadOnlyBeforeSend = new RedisCommands {
    override def send[T](redisCommand: RedisCommand[_ <: RedisReply, T]): Future[T] = {
      cluster.send(ReadOnly)
      cluster.send(redisCommand)
    }
    override implicit val executionContext: ExecutionContext = materializer.executionContext
  }

  cluster.redisServers.foreach(replica => cluster.onConnect(setReadOnlyBeforeSend,replica))

  cluster.lrange("some key", 0, -1)

the lrange fails with:

java.lang.RuntimeException: server not found: no server available
    at redis.RedisCluster$$anonfun$send$2.apply(RedisCluster.scala:161)
    at redis.RedisCluster$$anonfun$send$2.apply(RedisCluster.scala:161)
    at scala.Option.getOrElse(Option.scala:121)
    at redis.RedisCluster.send(RedisCluster.scala:161)
    at redis.commands.Lists$class.lrange(Lists.scala:35)
    at redis.RedisCluster.lrange(RedisCluster.scala:20)
    at akka.http.scaladsl.server.directives.OnSuccessMagnet$$anon$1$$anonfun$1$$anonfun$apply$4.apply(FutureDirectives.scala:97)
    at akka.http.scaladsl.server.directives.OnSuccessMagnet$$anon$1$$anonfun$1$$anonfun$apply$4.apply(FutureDirectives.scala:95)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:42)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:42)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$5.apply(BasicDirectives.scala:153)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$5.apply(BasicDirectives.scala:153)
    at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
    at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:60)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:60)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$5.apply(BasicDirectives.scala:153)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$5.apply(BasicDirectives.scala:153)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:66)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:66)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$5.apply(BasicDirectives.scala:153)
    at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$5.apply(BasicDirectives.scala:153)
    at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32)
    at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28)
    at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:73)
    at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:72)
    at akka.stream.impl.fusing.MapAsync$$anon$21.onPush(Ops.scala:1008)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:649)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
    at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
randomstatistic commented 7 years ago

This issue appears more generic to me than just lrange, it looks like it applies to GET, and potentially every operation.

From what I can tell from looking at the code, although RedisCluster figures out the master and slaves for each slot, it then never pays any attention to the slaves list. It only evaluates masters when trying to find the right connection for a command.

So it would seem that putting any read replicas into the list of RedisServer you provide to RedisCluster is pointless. (Unless, maybe, one of them gets promoted to be a master? I'm not sure about that.)

l15k4 commented 7 years ago

Wait, if we put sharding aside, does it mean that the READ load is not distributed among all nodes and all GET queries are hitting the master? I was expecting WRITES to go only to master and READs to be distributed in a round robin fashion.

From what I can see in the code, any load is really not distributed to replicas, this is the place where it looks like it ignores all connections except for Master : https://github.com/etaty/rediscala/blob/master/src/main/scala/redis/RedisCluster.scala#L65