Closed gnuletik closed 9 months ago
The Publish function does return an error, is there something else I'm missing?
Hi, thanks for the feedback!
Yes, the Publish function does return an error in its signature.
However, when Publish
fails, the error is not propagated to the caller.
For example, in the scenario if the Publish
triggers an ACCESS_REFUSED
error.
The error return value is nil
.
Then, the logger logs:
channelmanager/channel_manager.go:69: attempting to reconnect to amqp server after close with error: Exception (403) Reason: "ACCESS_REFUSED - write access to exchange '...' in vhost '...' refused for user '...'"
channelmanager/channel_manager.go:101: waiting 5s seconds to attempt to reconnect to amqp server
channelmanager/channel_manager.go:124: error closing channel while reconnecting: Exception (504) Reason: "channel/connection is not open"
channelmanager/channel_manager.go:71: successfully reconnected to amqp server
publish.go:115: successful publisher recovery from: Exception (403) Reason: "ACCESS_REFUSED - write access to exchange '...' in vhost '...' refused for user '...'"
This test has been run with github.com/wagslane/go-rabbitmq@v0.12.4
When the error is ACCESS_REFUSED
, that seems legit to discard the error.
However, I'm trying to implement a retry mechanism when the rabbitmq server is being upgraded / restarted by a cloud provider (e.g. AWS AmazonMQ RabbitMQ).
Thanks!
I did a quick test and indeed these type of errors [^1][^2] are not propagated from the amqp
base library; there is nothing any wrapping library can do for you (well, short of checking things, which you could also do in app layer beforehand [^3]).
The fairly clean way out [^4] is to call PublishWithDeferredConfirmWithContext() and then wait on the returned PublisherConfirmation
(array of type amqp.DeferredConfirmation) via various tests (array length, [0] is valid then amqp calls on [0] with context and/or timers, whatever the amqp library offers, ref. amqp.DeferredConfirmation See also the publisher_confirm example.
I hope this helped a bit.
[^1]: publishing on existing exchange but non existing queue does not return any error nor drops/reconnects the channel.
[^2]: publishing on inaccessible exchange/queues (non existing or ownership) will not return any amqp publishing errors but will drop & reconnect the channel. Publishing WithImmediate
will also drop/recover the supporting amqp channel (I wasn't expecting this 🤕 : _Exception (540) Reason: "NOTIMPLEMENTED - immediate=true")
[^3]: a bit of a shame this library does not export the inner channel manager or up-streams the chan manager safe_wrappers to the consumer/publisher objects. This way the apps could have called on all amqp channel functions, including testing for queues via QueueDeclarePassiveSafe
[^4]: looks like you're better off awaiting confirmations from the app layer as described above.
Thanks for investigating into this @LucaWolf !
I'm quite surprised that the underlying library does not propagate 540 exceptions. I tried to look into the library which exceptions returns an error but could not find it.
I'll use PublishWithDeferredConfirmWithContext
to handle this case, thanks!
Glad you found that!
Hello !
I'm wondering if the library provides a way to retry a failed publish? In the event where a publish fails, the Publish function doesn't return an error. Instead it auto-reconnects and discard the message. For example:
I'd like to implement a mechanism to retry with an exponential back-off strategy (either inside the library or in the caller) but I'm not sure if that's possible as the error is not propagated. Do you know if there's a way to implement that?
Thanks!