Closed matejuh closed 4 years ago
If I understand correctly, the current behavior is to cancel a consumer flux when its queue is deleted, and you would like the consumer not to be canceled and to resume when the queue is created again?
This cannot be the default behavior: what if the queue is never re-created? We cannot keep resources around forever. What if a queue with the same name is created later but it has nothing to do with the first one, the "paused" consumer would keep consuming the message from the queue.
We have to narrow down the semantics here.
A workaround right now would be to retry to start the consumer flux in the application.
I can argue that in case that you have auto-delete queue and there is a network hiccup, then all apps must be restarted to start the consumer flux again (if no other mechanism). Same for longer hiccups and short TTL on queue. But you are right, that there are workarounds and it probably depends on application logic to restart the consumer.
Just for the record (and maybe you would be so kind to validate my solution please).
I found following cases:
setTopologyRecoveryRetryHandler(TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build())
must be set otherwise consumer recreate fails on missing queue (retry needed)Consumer flux:
private val flux = rabbitReceiver
.consumeAutoAck(queueName, consumeOptions)
.delaySubscription(rabbitDeclarationPublisher)
.retryWhen(retrySpec)
subscribe:
fun subscribe() {
flux.subscribe(
{ message ->
log.debug { "action=consumed_message message=$message" }
},
{ t ->
log.error(t) { "action=subscriber_completed status=error queue=$queueName" }
this.subscribe()
},
{
log.warn { "action=subscriber_completed status=cancelled queue=$queueName" }
this.subscribe()
}
)
}
I'm not sure I'm not following you. What is rabbitDeclarationPublisher
? What are you trying to achieve?
When a queue is deleted, the consumer cancel callback is called for each consumer, to inform them they will not receive messages anymore. Those consumers do not exist anymore for the broker. Applications (or higher-level frameworks like Reactor RabbitMQ) can take the initiative to try to re-consume from the queue. This is typically a retried operation with some timeout and it could be an opt-in in Reactor RabbitMQ? Is it something you'd like to see?
rabbitDeclarationPublisher
is a Mono which creates topology- exchange, queue and bindings. I want to be sure that topology is created before I start consuming messages.
It would definitely help to have this like opt-in in the library itself. Currently it's for example not possible to distinguish why consumer was stopped. I think it should not finish with success in case it was cancelled because of connection error.
I think it should not finish with success in case it was cancelled because of connection error
If there's a connection error, the connection recovery should kick in and recover the consumer, the flux should not be affected. If the queue is deleted, the cancel callback is called (AMQP Java client callback) and Reactor RabbitMQ completes the flux. So there is no flux cancellation when there is a connection error.
You're welcome to submit a PR, I'd recommend to experiment with the recovery tests for a start. You should think about the semantics first though, I think there's some confusion around the "queue deleted" / "connection error" events.
Expected Behavior
Receiver Flux is not cancelled when missing queue or retried.
Actual Behavior
Receiver Flux is cancelled when missing queue.
Steps to Reproduce
Start the application, delete the queue, then consumer queue is cancelled: Flux consumer has been cancelled Cancelling consumer
Create a queue again somehow eg. topology autorecovery.
No messages are consumed, because Receiver flux is cancelled.
In my case this happened after downtime redeploy of single RabbitMQ node. Queue was lost but recovered with topology autorecovery, but my consumer didn't consume any messages.
Your Environment