SpinGo / op-rabbit

The Opinionated RabbitMQ Library for Scala and Akka
Other
232 stars 73 forks source link

StatusCheckMessage does not use specified timeout #128

Open tilmanginzel opened 6 years ago

tilmanginzel commented 6 years ago

Hi,

we use the StatusCheckMessage to verify if a connection to rabbitmq is established. It looks like the specified timeout is never used, so the status check never runs into the timeout. Apparently the private method withTimeout is unused.

This example code never fails, even if rabbitmq is not available:

def health(): Future[Done] = {
  val statusCheckMessage = new StatusCheckMessage(2 seconds)
  rabbitControl ! statusCheckMessage

  statusCheckMessage.okay map {
    _ => Done
  } recover {
    case t: Throwable => throw t
  }
}

Workaround: We created our own withTimeout function and wrapped it around the Future.

def withTimeout[T](duration: FiniteDuration)(f: => Future[T]): Future[T] = {
  val timer = akka.pattern.after(duration, using = system.scheduler) {
    Future.failed(new scala.concurrent.TimeoutException(s"Response not received after $duration."))
  }
  Future.firstCompletedOf(timer :: f :: Nil)
}

def health(): Future[Done] = {
  val statusCheckMessage = new StatusCheckMessage()
  rabbitControl ! statusCheckMessage

  withTimeout(2 seconds) {
    statusCheckMessage.okay map {
      _ => Done
    } recover {
      case t: Throwable => throw t
    }
  }
}

Cheers :)

afrancoc2000 commented 6 years ago

This looks great! thanks I'm going to try it :)

afrancoc2000 commented 6 years ago

Hi,

I tried it with a timers.startPeriodicTimer() method to check the health every minute, the weird thing is that even when I see all consumers went down and the producer gets a timeout with every message it sends I'm not getting any error and the future always returns "Done".

I'm thinking on sending a message instead of doing the status check, using ask and kill the rabbit connection if I get a timeout, hoping the supervisor will restart the connection.

What I don't like is that I will be getting a lot of messages I don't need in my queue.

Have you had a similar problem before?

Its like the connection is alive but is useless

afrancoc2000 commented 6 years ago

I'm thinking on something like this:

def checkHealth(rabbitControl: ActorRef): Future[ConfirmResponse] = {
  val keyPublisher = Publisher.exchange(RABBIT_EXCHANGE)
  val received: Future[ConfirmResponse] = (
      rabbitControl ? Message(s"$address ${rabbitControl.path.name} is healthy", keyPublisher)
      ).mapTo[ConfirmResponse]

  received.onFailure{
    case e =>
      LogHelper.logger.info(s"$address ${rabbitControl.path.name} is not healthy: ${e.getMessage}")
      rabbitControl ! Kill
  }

  received
}