SpinGo / op-rabbit

The Opinionated RabbitMQ Library for Scala and Akka
Other
232 stars 73 forks source link

Handling 403 ACCESS_REFUSED for exclusive consumers #167

Open mturlo opened 5 years ago

mturlo commented 5 years ago

With #134 closed, we're now able to define an exclusive consumer for a queue. According to specification, the broker will disallow for more than one exclusive consumer to be connected and result in a 403 ACCESS_REFUSED response when so attempted.

In RMQ's Java driver's terms this throws a java.io.IOException with com.rabbitmq.client.ShutdownSignalException as cause on channel.basicConsume() call. Trouble is, this causes ChannelActor to go into a perpetual dropChannelAndRequestNewChannel() cycle, which, while possibly valid from a usability point of view (we may want to continuously attempt to compete for being the exclusive consumer), results in a tight, CPU-intensive loop.

I'm still relatively new to op-rabbit's architecture and connection process, but intuitively it looks like this is because the exception happens once the channel is considered established and at that point trying to re-define the channel consumer is the best-effort action to deal with potential "runtime" consumption errors. I'd be happy to hear a more educated opinion, though :)

With the above in mind:

  1. What exactly do we want to happen once an exclusive consumer is denied consumption? Stop further connection attempts immediately? Do keep retrying, albeit in a more "reasonable" manner? Leave that decision to the user and expose options to define that behaviour?
  2. For the simplest scenario (consumer termination), we can recognise this case in setupSubscription()'s catch block, as we potentially have access to both the subscription definition and exception details - and just propagate the exception. So it would be like:
    case ShuttingDownException(e) =>
      e.getReason match {
        case close: Close if close.getReplyCode == 403 && subscription.exclusive =>
          log.warning("{} and subscription exclusive set to true - propagating exception", close.getReplyText)
          throw e
        case _ =>
          None
      }

    But that may be too crude?