twitter / finagle

A fault tolerant, protocol-agnostic RPC system
https://twitter.github.io/finagle
Apache License 2.0
8.79k stars 1.46k forks source link

Finagle redis client subscribe command fails in 18.6.0 #704

Open shakhovv opened 6 years ago

shakhovv commented 6 years ago

We tried use subscribe command with auth in redis and got this error: NOAUTH Authentication required.

Expected behavior

Should work subscribe without errors in finagle-redis with auth

Actual behavior

My app can't make any subscription and we got this exception in log:

SEVERE: Exception propagated to the root monitor!
java.lang.IllegalArgumentException: Unexpected reply type: ErrorReply
    at com.twitter.finagle.redis.exp.SubscribeCommands$SubscriptionManager.onMessage(SubscribeClient.scala:242)
    at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1(SubscribeDispatcher.scala:20)
    at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1$adapted(SubscribeDispatcher.scala:18)
    at com.twitter.util.Promise$Monitored.apply(Promise.scala:202)
    at com.twitter.util.Promise$Monitored.apply(Promise.scala:198)
    at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:90)
    at com.twitter.util.Promise$WaitQueue$$anon$5.run(Promise.scala:85)
    at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
    at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
    at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
    at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
    at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:85)
    at com.twitter.util.Promise$WaitQueue.runInScheduler$(Promise.scala:84)
    at com.twitter.util.Promise$Transformer.runInScheduler(Promise.scala:215)
    at com.twitter.util.Promise.updateIfEmpty(Promise.scala:739)
    at com.twitter.util.Promise.update(Promise.scala:711)
    at com.twitter.util.Promise.setValue(Promise.scala:687)
    at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:122)
    at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:188)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at com.twitter.finagle.netty4.codec.BufCodec$.channelRead(BufCodec.scala:65)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
    at java.lang.Thread.run(Thread.java:745)

Steps to reproduce the behavior

We use this environment: finagle-redis: 18.6.0 redis: 4.0.9

Test:

  import com.twitter.util.Await
  import com.twitter.finagle.Redis
  import com.twitter.io.Buf

  "Subscription with auth" should "work correctly" in {

    val r = Redis.client.newRichClient("localhost:6379")
    Await.result(r.auth(Buf.Utf8("mypass")))

    val subscriber = r.subscribe(Seq(Buf.Utf8("test"))) { case (_: Buf, message: Buf) =>
      println(Buf.Utf8.unapply(message).get)
    }

    Await.result(subscriber)
    Await.result(r.publish(Buf.Utf8("test"), Buf.Utf8("test")))
  }
stefanlance commented 6 years ago

Hi @shakhovv. Thanks for the ticket. I'm able to reproduce this exception locally with the test you've provided. What's curious is that I don't see the exception in the pubsub integration tests. I'll look into this.

stefanlance commented 6 years ago

I see this with Redis 3.2.12 as well as Redis 4.0.10.

hussain21j commented 6 years ago

Did any one solve this problem ?

stefanlance commented 6 years ago

Hi @hussain21j. I took another look at this recently and the reason the pubsub integration tests work is that the servers they spin up don't require authentication. I think the test @shakhovv posted fails because a new service is created when the client tries to send the subscribe command, and this service has not yet authenticated. The call graph I'm looking at is in SubscribeClient: subscribe calls retry calls doRequest calls RedisPool.forSubscription calls factory.newService.

As far as I can tell, the SubscribeClient isn't allowed to send Auth requests as written. This makes authenticating subscriptions a bit tricky.

Unfortunately work on finagle-redis hasn't been internally prioritized as of late so I'm unsure when a fix will be merged but I'll try to dig in some more. Of course feel free to submit a PR if you find a fix.

stefanlance commented 6 years ago

I have a proof of concept for a patch. We can authenticate before we send the subscribe command in SubscribeDispatcher.apply and handle the case of receiving a StatusReply (authentication successful) in SubscriptionManager.onMessage. The fix isn't this simple because it requires some extra machinery that will store the password provided on the first Auth command and pass it to SubscribeDispatcher.apply, but in theory this is possible.

stefanlance commented 6 years ago

The problem with the above approach is that it requires storing and passing around the password between the client and the subscribe dispatcher. Here are some other ideas:

  1. Allow a password to be passed to the Subscribe command. This isn't ideal since it's not how vanilla redis clients interact with the server, but I think it would suffice as a workaround.
  2. Look into why the Subscribe command creates a new service and check if we can get away without doing so. (The new service is what requires re-authenticating.)

As I mentioned earlier we aren't prioritizing work on finagle-redis internally and we've decided to not fix this unless it gets more attention. Hopefully there's enough detail here so that a motivated external user can pick up from here and submit a patch, if desired.

amishra123 commented 5 years ago

I'll be interested to work on this.

dmackenthun commented 3 years ago

Hi All,

I'm using version 20.4.1. When creating the redis client using the withPassword option I'm able to authenticate on the fly. The issue now, is that whenever I run a subscribe command with auth enabled I get java.lang.IllegalArgumentException: Not a subscribe/unsubscribe command at com.twitter.finagle.redis.exp.SubscribeDispatcher.apply(SubscribeDispatcher.scala:54)

I believe the issue is that SubscribeDispatcher is receiving an Auth command before receiving the Subscribe command. When this happens there is a catch all for non-subscribe commands that throws the exception. Has anyone else seen this issue or been able to work around it?