mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Updating credentials not possible in initial connection loop of robust connection #619

Open MatthiasWerning opened 4 months ago

MatthiasWerning commented 4 months ago

We are using the robust connection of aio-pika to connect to our AMQP broker using an OAuth2 token as the AMQP password which was retrieved beforehand. However we are running into an issue with the token itself expiring after a relatively short period of time (currently 15 minutes in our scenario).

When calling the connect_robust() method and the broker is not reachable by the next 15 minutes the token expires. This wouldn't be too much of a problem as a new token can easily be obtained, however since the connect_robust() function is still in its "initial connect loop" an update of the AMQP password using the new token is not possible. Even if the broker is reachable after the token expires, any subsequent connection attempt will never be successful since the token itself has expired and the AMQP broker will refuse any connection attempt.

This might be a rare condition, but if it does happen it will render the app instance unusable since it will never jump out of the connect_robust() function.

Would there be any way of allowing to update AMQP credentials while in the "initial connection loop"? I would think of some sort of a credentials retrieval function that may be passed down the function and which is called in each subsequent connection attempt made by the "initial connection loop". Or is there any other, possibly better way I cannot think about right now?

MatthiasWerning commented 3 months ago

I could also create a pull request with the solution I proposed if this approach would be ok.

Darsstar commented 3 months ago

I expect something like this should work.

while True:
    try:
        connection = connect_robust(login=login, password=password, <snip>, fail_fast=True)
        break;
    except aio_pika.exceptions.CONNECTION_EXCEPTIONS:
        login, password = get_credentials()

The docstring should probably be updated to mention RobustConnection.PARAMETERS at the very least. Even better would be to make unit tests which inspect the signatures of aio_pika.connect() and aio_pika.connect_robust() include all of Connection.PARAMETERS and RobustConnection.PARAMETERS, respectively.

Darsstar commented 3 months ago

oh, I misread the default of fail_fast to be False when it in fact is True. If you are using the default I'm not sure what is going on.

RedLine89 commented 1 month ago

We have exactly the same issue on one of our projects that use OAuth2 with tokens that have quite short lifespan (under 5 mins).

I tried using reconnect_callbacks but they get triggered only after the connection goes through, not before. So there is no way to update password/token on re-attempt.

Pseudo-code:

    async def get_new_token(robustConnection):
        new_token = auth.get_access_token()
        robustConnection.password = new_token

   connection = await aio_pika.connect_robust(host=host, password=token)
   connection.reconnect_callbacks.add(get_new_token)

If we get disconnected from RMQ after token expired, reconnection keeps trying with an old token and we are perpetually stuck with this error:

Connection attempt to "amqp://guest:******@host.docker.internal:5672/test?reconnect_interval=5" failed: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.. Reconnecting after 5 seconds.

It would be very nice if there was a callback that would be called before a reconnect attempt in order to get a new token.

MatthiasWerning commented 1 month ago

Update: I think I messed a few things up here and the error doesn't occur as described. As @Darsstar already mentioned there should be no issue on initial connect except for when the fail_fast parameter is explicitly set to false.

However there is an old token / password being used in the following scenario, which I reproduced today:

  1. Let the client connect to the broker in a regular fashion.
  2. Then shutdown the broker and let the client go into its reconnect loop.
  3. Startup the broker again after the token / password has expired.
  4. The robust connection will still use the old token / password, as its internal url has never been changed.

This also poses an issue with update_secret, as this method can only be called while the client is actually connected to the broker. The current workaround is to update the url myself using the same line of code as in update_secret (but without updating it remotely) after a new valid token is fetched:

self._connection.url = self._connection.url.with_password(new_token)

(Provided your robust connection is available under self._connection and your new token under new_token)

One solution could be to alter update_secret in not trying to update the secret remotely if the connection is currently unavailable, but only locally. The next reconnect will occur using the new url anyway.

MatthiasWerning commented 1 month ago

One more thing to clarify: The token is currently updated in an async loop in a certain interval that will always ensure that a fresh token is available. This is an alternate approach than the one mentioned by @Darsstar which works as well. Though I actually prefer the "lazy" way more as well.