celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.82k stars 922 forks source link

In mutlithreading flow the add_consumer method gets stuck in infinite loop #1568

Open rakeshbabuseva opened 2 years ago

rakeshbabuseva commented 2 years ago

When I tried to call the add_consumer(https://github.com/celery/celery/blob/515f98b7072677439423a15035541d24bfcb2348/celery/app/control.py#L554) method for the non-existing(which might also possible that worker is in not responding state) worker then the process went into an infinite loop because the _lookup return empty for the given exchange or route_key https://github.com/celery/kombu/blob/f76726270ab6fdf175fab8ce39715c8122a35449/kombu/transport/redis.py#L784 called by https://github.com/celery/kombu/blob/f76726270ab6fdf175fab8ce39715c8122a35449/kombu/transport/redis.py#L427

After checking the code, I understood that the intentioned here is to restore the tasks based on the unacked_index and unacked queues which is a good feature to have but the redis transport method is an infinite loop https://cs.github.com/redis/redis-py/blob/e6cd4fdf3b159d7f118f154e28e884069da89d7e/redis/client.py#L1077 which is holding the caller thread/process forever if we hit the watch errors.

change got introduced in https://github.com/celery/kombu/commit/58975b2a95b213596a340c77f52a2eb94d529f54

Redis watch is raising the redis.exceptions.WatchError: Watched variable changed.

In my opinion, there should be some sort of handling for this scenario, may be here to skip restoring while adding consumers https://github.com/celery/kombu/blob/f76726270ab6fdf175fab8ce39715c8122a35449/kombu/transport/redis.py#L592 anyways after the visibility timeout it will get cleared or https://github.com/celery/kombu/blob/f76726270ab6fdf175fab8ce39715c8122a35449/kombu/transport/redis.py#L427 not to pass the unacked key as a watcher because we are not updating the variable for future use..

open-collective-bot[bot] commented 2 years ago

Hey @rakeshbabuseva :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.

auvipy commented 2 years ago

did you find a possible fix?

auvipy commented 2 years ago

@matusvalo

rakeshbabuseva commented 2 years ago

@auvipy I can work on how we should avoid this issue but first please help me in understanding about why are we doing the restore in the add_consumer flow? is there any reason?

rakeshbabuseva commented 2 years ago

I tested for cancel_consumer call as well the behaviour is same.

matusvalo commented 2 years ago

I did not check the issue. I have two questions:

  1. Can you provide reproducer for the issue?
  2. Could you confirm that the issue is introduced by #1444?
rakeshbabuseva commented 2 years ago

@matusvalo Can you provide reproducer for the issue? I dont have the reproducer script as of now Could you confirm that the issue is introduced by? Yes, earlier we dont use transaction method from redis library and not even use the watch command(https://github.com/celery/kombu/pull/1444/files#diff-203e309d100714904255d0af4500dd295e40e4ffedfdc9295ab3b386718ca87eR732), i think that is causing this infinite loop issue. I have given all the links in my description.