Closed amsjavan closed 5 years ago
This is something you'll have to roll yourself.
Send an ask request to rabbitControl GetConnectionActor
. From there you'll have a reference to akka rabbitmq and can do whatever you like.
Here is my solution:
case class RabbitHelper(rabbitControl: ActorRef) {
def withChannel(f: (Channel, Promise[Method]) ⇒ Unit): Future[Method] = {
val promise = Promise[Method]()
val msg = new MessageForPublicationLike {
override val dropIfNoChannel: Boolean = false
override def apply(ch: Channel): Unit = {
try {
f(ch, promise)
} catch {
case e: Throwable ⇒ promise.failure(e)
}
}
}
rabbitControl ! msg
promise.future
}
def bind(
queueName: String,
topic: String,
exchange: String = RabbitConfig.systemConfig.getString("topic-exchange-name")): Future[Method] =
withChannel { (ch, p) ⇒
ch.queueBind(queueName, exchange, topic) match {
case rsp: com.rabbitmq.client.AMQP.Queue.BindOk ⇒ p.success(rsp)
case _ ⇒ p.failure(new RuntimeException("Failed to bind"))
}
}
def unbind(
queueName: String,
topic: String,
exchange: String = RabbitConfig.systemConfig.getString("topic-exchange-name")): Future[Method] =
withChannel { (ch, p) ⇒
ch.queueUnbind(queueName, exchange, topic) match {
case rsp: com.rabbitmq.client.AMQP.Queue.UnbindOk ⇒ p.success(rsp)
case _ ⇒ p.failure(new RuntimeException("Failed to unbind"))
}
}
}
But the problem is that bind and unbind would not apply in reconnection. What's your idea?
This is something that akka-rabbitmq handles for you. I recommend you have a look at the docs:
https://github.com/NewMotion/akka-rabbitmq#setup-channel
In this example, a channel is created, and the function is applied to configure it. In the event of rabbitmq disconnect, topology declaration is reinitialized by recreating the channels, and re-applying the provided functions.
I subscribe a list of topic to a queue(one subscriber actor). How to unbind queue for one of the topics?