SpinGo / op-rabbit

The Opinionated RabbitMQ Library for Scala and Akka
Other
232 stars 73 forks source link

Subscription `closed` future is never completed when channel creation fails #162

Closed oridag closed 5 years ago

oridag commented 5 years ago

When channel fails on connection, there is no way to react from outside the subscription:

import akka.actor.{ActorSystem, Props}
import com.spingo.op_rabbit.{Queue, RabbitControl, RecoveryStrategy, Subscription}

import scala.concurrent.ExecutionContextExecutor

object Test extends App {
  val actorSystem = ActorSystem("actorSystem")
  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
  implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
  implicit val recoveryStrategy: RecoveryStrategy = RecoveryStrategy.none
  import com.spingo.op_rabbit.Directives._
  val subscriptionRef = Subscription.run(rabbitControl) {
    channel() {
      consume(Queue.passive("non-exisiting-queue")) {
        body(as[String]) { body =>
          println(s"""got message '$body'""")
          ack
        }
      }
    }
  }
  subscriptionRef.closed.foreach(_ => {println("Will never be printed"); actorSystem.terminate()})
}
DStranger commented 5 years ago

Hi @oridag!

When subscription fails, the closed future is not completed, but failed with the cause of failure.

import akka.actor.{ActorSystem, Props}
import com.spingo.op_rabbit.{Queue, RabbitControl, RecoveryStrategy, Subscription}
import scala.concurrent.ExecutionContextExecutor

object Test extends App {

  val actorSystem = ActorSystem("actorSystem")
  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
  implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
  implicit val recoveryStrategy: RecoveryStrategy = RecoveryStrategy.none
  import com.spingo.op_rabbit.Directives._
  val subscriptionRef = Subscription.run(rabbitControl) {
    channel() {
      consume(Queue.passive("non-exisiting-queue")) {
        body(as[String]) { body =>
          println(s"""got message '$body'""")
          ack
        }
      }
    }
  }

  subscriptionRef.closed.recover {
    case t =>
      println(s"Failed to subscribe because of ${t.getMessage}")
  }.foreach(_ => actorSystem.terminate())
}
DStranger commented 5 years ago

Closing, because this is intended behavior.

oridag commented 5 years ago

Got it, makes sense. Thanks @DStranger !