python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.1k stars 173 forks source link

Arq - Worker stops when Redis connection is interrupted #435

Closed aintItPythonic closed 6 months ago

aintItPythonic commented 6 months ago

So using multiple pods in Kubernetes/FastAPI, and occasionally, we get an interruption to Redis.

After Redis comes back, the worker stops doing jobs/tasks. We are only using arq for cron jobs.

Here is the traceback on the connection interruption. Don't quite know why it interrupts.


Task exception was never retrieved
future: <Task finished name='Task-3' coro=<Worker.async_run() done, defined at /usr/local/lib/python3.9/site-packages/arq/worker.py:308> exception=ConnectionError('Connection closed by server.')>
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/arq/worker.py", line 313, in async_run
await self.main_task
File "/usr/local/lib/python3.9/site-packages/arq/worker.py", line 354, in main
await self._poll_iteration()
File "/usr/local/lib/python3.9/site-packages/arq/worker.py", line 379, in _poll_iteration
job_ids = await self.pool.zrangebyscore(
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/client.py", line 606, in execute_command
return await conn.retry.call_with_retry(
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/retry.py", line 62, in call_with_retry
await fail(error)
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/client.py", line 593, in _disconnect_raise
raise error
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/client.py", line 580, in _send_command_parse_response
return await self.parse_response(conn, command_name, **options)
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/client.py", line 627, in parse_response
response = await connection.read_response()
File "/usr/local/lib/python3.9/site-packages/redis/asyncio/connection.py", line 510, in read_response
response = await self._parser.read_response(
File "/usr/local/lib/python3.9/site-packages/redis/_parsers/hiredis.py", line 203, in read_response
await self.read_from_socket()
File "/usr/local/lib/python3.9/site-packages/redis/_parsers/hiredis.py", line 186, in read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
redis.exceptions.ConnectionError: Connection closed by server.

Rather than restarting the pod, when the connection is interrupted, it would be better to get the worker reset somehow when the connection comes back online.

arq is 0.25.0 and we are using this with Kubernetes and FastAPI also, but I don't think it has anything to do with FastAPI.

Also I don't believe it is related to Kubernetes, as have been able to replicate it locally.

The Worker is seems to be still alive, but no tasks/jobs are going through it.

JonasKs commented 6 months ago

I use health checks.

    livenessProbe:
            exec:
              command:
                - arq
                - WorkerSettings
                - --check
samuelcolvin commented 6 months ago

Looks like @JonasKs has the right solution.

JonasKs commented 6 months ago

I should also add that you want to have a unique health check key for each pod. Add this to your worker settings: health_check_key = socket.gethostname()