procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
854 stars 52 forks source link

Worker crashes randomly with SQL exceptions #1108

Open hadijaveed opened 3 months ago

hadijaveed commented 3 months ago

Running into an issue where the worker crashes now and then and never restarts. I keep getting the following error on periodic runs

single_worker error: ConnectorException('\n Database error.\n ')

DEFAULT 2024-07-09T17:43:13.149693Z procrastinate.exceptions.RunTaskError: One of the specified coroutines ended with an exception
DEFAULT 2024-07-09T17:43:13.149685Z raise exceptions.RunTaskError from exception_records[0].exc
DEFAULT 2024-07-09T17:43:13.149678Z File "/usr/local/lib/python3.11/site-packages/procrastinate/utils.py", line 363, in run_tasks
DEFAULT 2024-07-09T17:43:13.149673Z await utils.run_tasks(
DEFAULT 2024-07-09T17:43:13.149670Z File "/usr/local/lib/python3.11/site-packages/procrastinate/worker.py", line 143, in run
DEFAULT 2024-07-09T17:43:13.149667Z await asyncio.shield(task)
DEFAULT 2024-07-09T17:43:13.149663Z File "/usr/local/lib/python3.11/site-packages/procrastinate/app.py", line 273, in run_worker_async
DEFAULT 2024-07-09T17:43:13.149658Z ^^^^^^^^^^^^
ERROR 2024-07-09T17:43:13.149655Z Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 732, in lifespan async with self.lifespan_context(app) as maybe_state: File "/usr/local/lib/python3.11/contextlib.py", line 217, in __aexit__ await anext(self.gen) File "/dyrt/dyrt/background/server.py", line 35, in lifespan await asyncio.wait_for(worker, timeout=2) File "/usr/local/lib/python3.11/asyncio/tasks.py", line 489, in wait_for return fut.result()
DEFAULT 2024-07-09T17:43:13.149639Z The above exception was the direct cause of the following exception:
DEFAULT 2024-07-09T17:43:13.149635Z
DEFAULT 2024-07-09T17:43:13.149630Z Database error.
DEFAULT 2024-07-09T17:43:13.149627Z procrastinate.exceptions.ConnectorException:
DEFAULT 2024-07-09T17:43:13.149613Z raise exceptions.ConnectorException from exc
DEFAULT 2024-07-09T17:43:13.149604Z File "/usr/local/lib/python3.11/site-packages/procrastinate/sync_psycopg_connector.py", line 32, in wrap_exceptions
DEFAULT 2024-07-09T17:43:13.149601Z self.gen.throw(typ, value, traceback)
DEFAULT 2024-07-09T17:43:13.149597Z File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
DEFAULT 2024-07-09T17:43:13.149593Z with sync_psycopg_connector.wrap_exceptions():
DEFAULT 2024-07-09T17:43:13.149589Z File "/usr/local/lib/python3.11/site-packages/procrastinate/psycopg_connector.py", line 42, in wrap_exceptions
DEFAULT 2024-07-09T17:43:13.149586Z await self.gen.athrow(typ, value, traceback)
DEFAULT 2024-07-09T17:43:13.149583Z File "/usr/local/lib/python3.11/contextlib.py", line 231, in __aexit__
DEFAULT 2024-07-09T17:43:13.149578Z async with self._recreate_cm():
DEFAULT 2024-07-09T17:43:13.149574Z File "/usr/local/lib/python3.11/contextlib.py", line 96, in inner
DEFAULT 2024-07-09T17:43:13.149570Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
DEFAULT 2024-07-09T17:43:13.149567Z row = await self.connector.execute_query_one_async(
DEFAULT 2024-07-09T17:43:13.149562Z File "/usr/local/lib/python3.11/site-packages/procrastinate/manager.py", line 137, in fetch_job
DEFAULT 2024-07-09T17:43:13.149557Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ERROR 2024-07-09T17:43:13.149554Z Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/procrastinate/worker.py", line 161, in single_worker job = await self.job_manager.fetch_job(self.queues)
DEFAULT 2024-07-09T17:43:13.149530Z The above exception was the direct cause of the following exception:
DEFAULT 2024-07-09T17:43:13.149523Z psycopg_pool.PoolTimeout: couldn't get a connection after 30.00 sec
DEFAULT 2024-07-09T17:43:13.149520Z raise PoolTimeout(
DEFAULT 2024-07-09T17:43:13.149516Z File "/usr/local/lib/python3.11/site-packages/psycopg_pool/pool_async.py", line 226, in getconn
DEFAULT 2024-07-09T17:43:13.149511Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
DEFAULT 2024-07-09T17:43:13.149507Z conn = await self.getconn(timeout=timeout)
DEFAULT 2024-07-09T17:43:13.149503Z File "/usr/local/lib/python3.11/site-packages/psycopg_pool/pool_async.py", line 191, in connection
DEFAULT 2024-07-09T17:43:13.149499Z ^^^^^^^^^^^^^^^^^^^^^
DEFAULT 2024-07-09T17:43:13.149494Z return await anext(self.gen)
DEFAULT 2024-07-09T17:43:13.149490Z File "/usr/local/lib/python3.11/contextlib.py", line 210, in __aenter__
DEFAULT 2024-07-09T17:43:13.149486Z async with self.pool.connection() as connection:
DEFAULT 2024-07-09T17:43:13.149482Z File "/usr/local/lib/python3.11/site-packages/procrastinate/psycopg_connector.py", line 187, in _get_cursor
DEFAULT 2024-07-09T17:43:13.149477Z ^^^^^^^^^^^^^^^^^^^^^
DEFAULT 2024-07-09T17:43:13.149472Z return await anext(self.gen)
DEFAULT 2024-07-09T17:43:13.149469Z File "/usr/local/lib/python3.11/contextlib.py", line 210, in __aenter__
DEFAULT 2024-07-09T17:43:13.149466Z async with self._get_cursor() as cursor:
DEFAULT 2024-07-09T17:43:13.149461Z File "/usr/local/lib/python3.11/site-packages/procrastinate/psycopg_connector.py", line 209, in execute_query_one_async
DEFAULT 2024-07-09T17:43:13.149456Z ^^^^^^^^^^^^^^^^^^^^^^^^^
DEFAULT 2024-07-09T17:43:13.149453Z return await func(*args, **kwds)
DEFAULT 2024-07-09T17:43:13.149449Z File "/usr/local/lib/python3.11/contextlib.py", line 97, in inner

And when I looked into Postgres we simultaneously got the following exception

db=postgres,user=postgres ERROR:  duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx""

Following is my job setup

@task_queue.periodic(cron="0 1 * * *")
@task_queue.task(queueing_lock="re_rank_all_tasks_for_users", pass_context=True)
async def re_rank_all_tasks_for_users(context, timestamp):

I am running the worker in async mode through FastAPI asynccontextmaanger

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task_queue.open_async():
        worker = asyncio.create_task(
            task_queue.run_worker_async(
                install_signal_handlers=False, concurrency=50,
                delete_jobs="successful"
            )
        )
        logger.info("Worker startup")
        yield
        logger.info("Worker Shutdown")
        worker.cancel()
        try:
            await asyncio.wait_for(worker, timeout=2)
        except asyncio.TimeoutError:
            logger.warning("Ungraceful shutdown")
        except asyncio.CancelledError:
            logger.info("Graceful shutdown")

few times we get the following error. which is not very descriptive either

single_worker error: ConnectorException('\n    Database error.\n    '

Any idea what's going on?

onlyann commented 3 months ago

Logs suggest the root cause is that the pyscopg driver failed to obtain a connection and timed out.

psycopg_pool.PoolTimeout: couldn't get a connection after 30.00 sec

The procrastinate worker will give up when the main coroutine throws an error, which is what happened here.

Maybe the library could add more resiliency by retrying non critical errors.

In the meantime, you might be able to achieve some of that by wrapping the app run in a loop that catches errors and restarts the worker. That would prevent crashing the whole application.

hadijaveed commented 2 months ago

@onlyann thank you for your reply. how can I run a loop to catch these errors? Database connection is available, I was able to validate other Psycog library connections, not sure why single_worker runs into this error, also looks like we need to improve the logs

hadijaveed commented 2 months ago

so I added the loop and re-try mechanism, but seems like following SQL error breaks the full flow

db  | 2024-07-10 18:00:31.609 UTC [37] STATEMENT:  SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts
db  |       FROM procrastinate_fetch_job($1);
db  | 2024-07-10 18:00:31.615 UTC [36] ERROR:  canceling statement due to user request
db  | 2024-07-10 18:00:31.615 UTC [36] CONTEXT:  SQL statement "INSERT
db  |           INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp)
db  |           VALUES (_task_name, _periodic_id, _defer_timestamp)
db  |           ON CONFLICT DO NOTHING
db  |           RETURNING id"
db  |   PL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 7 at SQL statement
db  | 2024-07-10 18:00:31.615 UTC [36] STATEMENT:  SELECT procrastinate_defer_periodic_job($1, $2, $3, $4, $5, $6, $7) AS id;
db  | 2024-07-10 18:00:36.635 UTC [39] ERROR:  column "priority" does not exist at character 31
db  | 2024-07-10 18:00:36.635 UTC [39] STATEMENT:  SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts
db  |       FROM procrastinate_fetch_job($1);
db  | 2024-07-10 18:00:36.647 UTC [38] ERROR:  canceling statement due to user request
db  | 2024-07-10 18:00:36.647 UTC [38] CONTEXT:  SQL statement "INSERT
db  |           INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp)
db  |           VALUES (_task_name, _periodic_id, _defer_timestamp)
db  |           ON CONFLICT DO NOTHING
db  |           RETURNING id"
db  |   PL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 7 at SQL statement
db  | 2024-07-10 18:00:36.647 UTC [38] STATEMENT:  SELECT procrastinate_defer_periodic_job($1, $2, $3, $4, $5, $6, $7) AS id;

almost all of the errors I run into is because of the above SQL exception

medihack commented 2 months ago

What version of Procrastinate are you using? Have you recently upgraded?

db  | 2024-07-10 18:00:36.635 UTC [39] ERROR:  column "priority" does not exist at character 31

priority is a recently added feature. Have you applied the SQL migrations after the package update? (But I am still wondering why this then is a sporadic crash.)

hadijaveed commented 2 months ago

@medihack I am using 2.7.0 version, since I did not lock the package version

and yes it's very frustrating whole worker would crash again and again due to missing column SQL exception

if there are schema changes, package should detect new changes, give warnings or there should be some kind of backward compatibility, regardless I think worker should not crash on such small exceptions

hadijaveed commented 2 months ago

I am unsure if this is related to the new package release 2.7.0, but the worker crashed repeatedly on a small SQL exception. and the error log produced by procrastinate are not helpful at all, unless you look into Postgres DB error logs

Error produced by procrastinate:

app  | Main coroutine error, initiating remaining coroutines stop. Cause: ConnectorException('\n    Database error.\n    ')
app | single_worker error: ConnectorException('\n    Database error.\n    ')
app  | NoneType: None

Error produced by Postgres

Db   | 2024-07-11 04:33:31.386 UTC [36] ERROR:  Job was not found or not in "doing" status (job id: 75)
Db   | 2024-07-11 04:33:31.386 UTC [36] CONTEXT:  PL/pgSQL function procrastinate_retry_job(bigint,timestamp with time zone) line 12 at RAISE
Db   | 2024-07-11 04:33:31.386 UTC [36] STATEMENT:  SELECT procrastinate_retry_job($1, $2);

I am un-clear why would worker crash on such small issues?

onlyann commented 2 months ago

if there are schema changes, package should detect new changes, give warnings or there should be some kind of backward compatibility

Yes, it would be great to improve the experience on the migration aspect. This is tracked by #1040. Would you like to contribute?

In the meantime, I invite you to read the documentation on how migrations are handled for this library: https://procrastinate.readthedocs.io/en/stable/howto/production/migrations.html

the error log produced by procrastinate are not helpful at all, unless you look into Postgres DB error logs

The Database Error." happens to be the default message of ConnectorException. It should however include the inner error in its __cause__. This is typically output when logging the entire exception and not just the exception message.

That said, it is possible there is an issue within the library that doesn't output enough details. What do you say @ewjoachim ?

I am un-clear why would worker crash on such small issues?

It may look like a "small" issue, but there is not much the worker could do here. The schema change introduced in version 2.7.0 is not backward compatible. The newly introduced priority argument is part of the SQL query sent by Procrastinate 2.7.0 but because the migration has not run, the procrastinate_defer_periodic_job function does not have the correct list of arguments.

ewjoachim commented 2 months ago

if there are schema changes, package should detect new changes, give warnings or there should be some kind of backward compatibility, regardless I think worker should not crash on such small exceptions

There's backwards compatibility, the other way around (or forward compatibility if you want, it depends how you see it): you can run older versions of the code with newer versions of the schema, this is required for creating a no-downtime migration path.

Having both forward & backward compatibility (being able to run the old code with the new schema or the old schema with the new code) seems like a very complicated thing to do, I think, especially since we don't know how many versions people will skip and if they'll actually end up running the migrations. I'd rather find a way to ensure you can't run procrastinate with an old schema.

medihack commented 2 months ago

Maybe we should also add a more obvious hint on the documentation's start page or the quickstart page that a manual migration step may be required after a package upgrade for non-Django users. And maybe also that the version should be locked to a minor version when using tools like poetry, pipenv, rye, ...

hadijaveed commented 2 months ago

thank you for the help. and makes sense. I think for now locking a version would help

still unclear on why the worker crashes every time on small minor errors like following, which are not related to schema but more related to state of a task

Db   | 2024-07-11 04:33:31.386 UTC [36] ERROR:  Job was not found or not in "doing" status (job id: 75)
Db   | 2024-07-11 04:33:31.386 UTC [36] CONTEXT:  PL/pgSQL function procrastinate_retry_job(bigint,timestamp with time zone) line 12 at RAISE
Db   | 2024-07-11 04:33:31.386 UTC [36] STATEMENT:  SELECT procrastinate_retry_job($1, $2);

the above is a minor state that should not crash the full worke

medihack commented 2 months ago

Have you tried upgrading the package and applying the migrations? Does the problem still exist? What's the retry strategy? How does the task look like?

hadijaveed commented 2 months ago

@medihack I have upgraded the package, it has fixed the schema inconsistency error but I still run into the following error consistently from time to time, even re-starting the worker won't help, the only way is to clear out jobs that are not successful. the worker crash entirely on minor SQL exceptions, following is the exception

Db   | 2024-07-11 04:33:31.386 UTC [36] ERROR:  Job was not found or not in "doing" status (job id: 75)
Db   | 2024-07-11 04:33:31.386 UTC [36] CONTEXT:  PL/pgSQL function procrastinate_retry_job(bigint,timestamp with time zone) line 12 at RAISE
Db   | 2024-07-11 04:33:31.386 UTC [36] STATEMENT:  SELECT procrastinate_retry_job($1, $2);

This is how worker is initialized in FastAPI

task_queue = App(connector=PsycopgConnector(
    conninfo=os.getenv("PROCRASTINATE_DB_URL")
))

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task_queue.open_async():
        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )
        # Set to 100 to test the ungraceful shutdown
        await sleep.defer_async(length=5)

        print("STARTUP")
        yield
        print("SHUTDOWN")

        worker.cancel()
        try:
            await asyncio.wait_for(worker, timeout=10)
        except asyncio.TimeoutError:
            print("Ungraceful shutdown")
        except asyncio.CancelledError:
            print("Graceful shutdown")

app = FastAPI(lifespan=lifespan)

example jobs with re-try strategy

@task_queue.task(retry=RetryStrategy(
    max_attempts=5,
    wait=5,
    exponential_wait=5
))
async def process_file(file_id: str):
    …

@task_queue.periodic(cron="*/10 * * * *")
@task_queue.task(queueing_lock="retry_stalled_jobs", pass_context=True)
async def retry_stalled_jobs(context, timestamp):
    …
medihack commented 2 months ago

I wouldn't call it minor. It's something that should not happen. From looking at the worker code, the retry happens before the status of the job is changed and the job may be deleted. But I wonder if your retry_stalled_jobs somehow interferes with your process_file task. It's unclear to me which procrastinate_retry_job leads to that error, that of the process_file task or the retry you probably have in the retry_stalled_jobs task. Does the Python traceback give more information on that? Do these sporadic errors still happen without the periodic `retry_stalled_jobs´ task?