mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

ChannelInvalidStateError after updating secret and publishing a message #618

Open sidjakinsboriss opened 4 months ago

sidjakinsboriss commented 4 months ago

Hi, I'm using the RabbitMQ OAuth 2.0 authentication backend, and therefore connecting to the broker with access tokens. These tokens can expire, so I have a wrapper around the publishing function that looks somewhat like this:

def refresh_token(func):
    @wraps(func)
    async def wrapper(rabbit_client, *args, **kwargs):
        try:
            await func(rabbit_client, *args, **kwargs)
        except ChannelAccessRefused:
            new_token = rabbit_client.get_access_token()
            await rabbit_client.connection.update_secret(new_token, reason='Token expired')
            await func(rabbit_client, *args, **kwargs)

    return wrapper

The issue is that when calling func after updating the secret, a ChannelInvalidStateError is raised, since the channel was closed. Adding await asyncio.sleep(0.01) before trying to call func again fixes the issue, but seems a bit hacky. Is there some mechanism for waiting until the channel is open again?

MatthiasWerning commented 4 months ago

You should know that every error that is occurring in an AMQP connection will force the connection to be closed. Thus it is normal, that the robust connection will try to establish another connection again after the current one has been closed due to this error.

We are using the OAuth2 backend as well, but have slightly different approach. Since the access token normally includes information about its own expiration one can easily calculate the absolute expiry in time themselves and check for this condition to renew the token BEFORE any subsequent operation is executed in the AMQP connection.

Thus you can always be sure that you have a fresh token before triggering an AMQP operation.