procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
876 stars 55 forks source link

Does the worker defer a task when it is occupied with another task? #1158

Closed paulzakin closed 3 months ago

paulzakin commented 3 months ago

minute 0 = Both tasks are added minute 1 = check_periodic_again should be added and an error should be thrown (because of the lock) but zero database activity occurs minute 2 = check_periodic finishes - attempts to add a new task to check_periodic (works) and check_periodic_again (works).

What should happen - minute 1 = should add the task?

CHECK_PERIODIC = "check_periodic"
CHECK_PERIODIC_AGAIN = "check_periodic_again"

# Runs every minute, but with a delay
@healthz_blueprint.task(name=CHECK_PERIODIC, cron="* * * * *", queueing_lock=CHECK_PERIODIC)
def check_periodic(*args: Any, **kwargs: Any) -> None:
    time.sleep(120)
    info("🚀 Procrastinate is running periodically!")

# Runs every minute
@healthz_blueprint.task(name=CHECK_PERIODIC_AGAIN, cron="* * * * *", queueing_lock=CHECK_PERIODIC_AGAIN)
def check_periodic_again(*args: Any, **kwargs: Any) -> None:
    info("🚀 Procrastinate is running periodically again!")
paulzakin commented 3 months ago

The log at m1 = nothing The log at m2 = below

🚀 Procrastinate is running periodically!
2024-08-14 06:39:52,987 INFO    procrastinate.periodic Periodic job healthz:check_periodic[28](timestamp=1723617480) deferred for timestamp 1723617480 with id 28
2024-08-14 06:39:52,989 ERROR   procrastinate.utils Main coroutine error, initiating remaining coroutines stop. Cause: IntegrityError('duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx"\nDETAIL:  Key (queueing_lock)=(check_periodic_again) already exists.\nCONTEXT:  SQL statement "INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)\n    VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)\n    RETURNING id"\nPL/pgSQL function procrastinate_defer_job(character varying,character varying,integer,text,text,jsonb,timestamp with time zone) line 5 at SQL statement\nSQL statement "UPDATE procrastinate_periodic_defers\n        SET job_id = procrastinate_defer_job(\n                _queue_name,\n                _task_name,\n                0,\n                _lock,\n                _queueing_lock,\n                _args,\n                NULL\n            )\n        WHERE id = _defer_id\n        RETURNING job_id"\nPL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 17 at SQL statement')
2024-08-14 06:39:52,989 INFO    procrastinate.worker Stop requested
2024-08-14 06:39:52,989 INFO    procrastinate.worker Waiting for job to finish: worker 0: healthz:check_periodic[26](timestamp=1723617420) (started -0.006 s ago)
2024-08-14 06:39:52,994 INFO    procrastinate.worker Job healthz:check_periodic[26](timestamp=1723617420) ended with status: Success, lasted 120.019 s
2024-08-14 06:39:53,000 DEBUG   procrastinate.worker Acknowledged job completion healthz:check_periodic[26](timestamp=1723617420)
2024-08-14 06:39:53,000 DEBUG   procrastinate.utils single_worker finished execution
2024-08-14 06:39:53,000 DEBUG   procrastinate.utils Stopped listener
2024-08-14 06:39:53,000 ERROR   procrastinate.utils periodic_deferrer error: IntegrityError('duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx"\nDETAIL:  Key (queueing_lock)=(check_periodic_again) already exists.\nCONTEXT:  SQL statement "INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)\n    VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)\n    RETURNING id"\nPL/pgSQL function procrastinate_defer_job(character varying,character varying,integer,text,text,jsonb,timestamp with time zone) line 5 at SQL statement\nSQL statement "UPDATE procrastinate_periodic_defers\n        SET job_id = procrastinate_defer_job(\n                _queue_name,\n                _task_name,\n                0,\n                _lock,\n                _queueing_lock,\n                _args,\n                NULL\n            )\n        WHERE id = _defer_id\n        RETURNING job_id"\nPL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 17 at SQL statement')
NoneType: None
2024-08-14 06:39:53,001 DEBUG   procrastinate.signals Resetting previous signal handler
2024-08-14 06:39:53,001 DEBUG   procrastinate.cli Exception details:
Traceback (most recent call last):
  File "/.venv/lib/python3.12/site-packages/django/db/backends/utils.py", line 105, in _execute
    return self.cursor.execute(sql, params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/psycopg/cursor.py", line 97, in execute
    raise ex.with_traceback(None)
psycopg.errors.UniqueViolation: duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx"
DETAIL:  Key (queueing_lock)=(check_periodic_again) already exists.
CONTEXT:  SQL statement "INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    RETURNING id"
PL/pgSQL function procrastinate_defer_job(character varying,character varying,integer,text,text,jsonb,timestamp with time zone) line 5 at SQL statement
SQL statement "UPDATE procrastinate_periodic_defers
        SET job_id = procrastinate_defer_job(
                _queue_name,
                _task_name,
                0,
                _lock,
                _queueing_lock,
                _args,
                NULL
            )
        WHERE id = _defer_id
        RETURNING job_id"
PL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 17 at SQL statement

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/.venv/lib/python3.12/site-packages/procrastinate/utils.py", line 335, in run_tasks
    await task
  File "/.venv/lib/python3.12/site-packages/procrastinate/worker.py", line 122, in periodic_deferrer
    return await deferrer.worker()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/procrastinate/periodic.py", line 132, in worker
    await self.defer_jobs(jobs_to_defer=self.get_previous_tasks(at=now))
  File "/.venv/lib/python3.12/site-packages/procrastinate/periodic.py", line 220, in defer_jobs
    job_id = await job_deferrer.job_manager.defer_periodic_job(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/procrastinate/manager.py", line 104, in defer_periodic_job
    result = await self.connector.execute_query_one_async(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/procrastinate/contrib/django/django_connector.py", line 92, in execute_query_one_async
    return await asgiref.sync.sync_to_async(self.execute_query_one)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/asgiref/sync.py", line 468, in __call__
    ret = await asyncio.shield(exec_coro)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/paulzakin/.local/share/mise/installs/python/3.12.5/lib/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/asgiref/sync.py", line 522, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/paulzakin/.local/share/mise/installs/python/3.12.5/lib/python3.12/contextlib.py", line 81, in inner
    return func(*args, **kwds)
           ^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/procrastinate/contrib/django/django_connector.py", line 124, in execute_query_one
    cursor.execute(query, self._wrap_json(arguments))
  File "/.venv/lib/python3.12/site-packages/django/db/backends/utils.py", line 122, in execute
    return super().execute(sql, params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/django/db/backends/utils.py", line 79, in execute
    return self._execute_with_wrappers(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/django/db/backends/utils.py", line 92, in _execute_with_wrappers
    return executor(sql, params, many, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/django/db/backends/utils.py", line 100, in _execute
    with self.db.wrap_database_errors:
  File "/.venv/lib/python3.12/site-packages/django/db/utils.py", line 91, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/.venv/lib/python3.12/site-packages/django/db/backends/utils.py", line 105, in _execute
    return self.cursor.execute(sql, params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.12/site-packages/psycopg/cursor.py", line 97, in execute
    raise ex.with_traceback(None)
django.db.utils.IntegrityError: duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx"
DETAIL:  Key (queueing_lock)=(check_periodic_again) already exists.
CONTEXT:  SQL statement "INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    RETURNING id"
PL/pgSQL function procrastinate_defer_job(character varying,character varying,integer,text,text,jsonb,timestamp with time zone) line 5 at SQL statement
SQL statement "UPDATE procrastinate_periodic_defers
        SET job_id = procrastinate_defer_job(
                _queue_name,
                _task_name,
                0,
                _lock,
                _queueing_lock,
                _args,
                NULL
            )
        WHERE id = _defer_id
        RETURNING job_id"
PL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 17 at SQL statement

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File /.venv/lib/python3.12/site-packages/procrastinate/cli.py", line 520, in execute_command
    await parsed.pop("func")(app=app, **parsed)
  File "/.venv/lib/python3.12/site-packages/procrastinate/cli.py", line 542, in worker_
    await app.run_worker_async(**kwargs)
  File "/.venv/lib/python3.12/site-packages/procrastinate/app.py", line 273, in run_worker_async
    await asyncio.shield(task)
  File "/.venv/lib/python3.12/site-packages/procrastinate/worker.py", line 143, in run
    await utils.run_tasks(
  File "/.venv/lib/python3.12/site-packages/procrastinate/utils.py", line 364, in run_tasks
    raise exceptions.RunTaskError from exception_records[0].exc
procrastinate.exceptions.RunTaskError: One of the specified coroutines ended with an exception
duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx"
DETAIL:  Key (queueing_lock)=(check_periodic_again) already exists.
CONTEXT:  SQL statement "INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    RETURNING id"
PL/pgSQL function procrastinate_defer_job(character varying,character varying,integer,text,text,jsonb,timestamp with time zone) line 5 at SQL statement
SQL statement "UPDATE procrastinate_periodic_defers
        SET job_id = procrastinate_defer_job(
                _queue_name,
                _task_name,
                0,
                _lock,
                _queueing_lock,
                _args,
                NULL
            )
        WHERE id = _defer_id
        RETURNING job_id"
PL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 17 at SQL statement
duplicate key value violates unique constraint "procrastinate_jobs_queueing_lock_idx"
DETAIL:  Key (queueing_lock)=(check_periodic_again) already exists.
CONTEXT:  SQL statement "INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
    RETURNING id"
PL/pgSQL function procrastinate_defer_job(character varying,character varying,integer,text,text,jsonb,timestamp with time zone) line 5 at SQL statement
SQL statement "UPDATE procrastinate_periodic_defers
        SET job_id = procrastinate_defer_job(
                _queue_name,
                _task_name,
                0,
                _lock,
                _queueing_lock,
                _args,
                NULL
            )
        WHERE id = _defer_id
        RETURNING job_id"
PL/pgSQL function procrastinate_defer_periodic_job(character varying,character varying,character varying,character varying,character varying,bigint,jsonb) line 17 at SQL statement
One of the specified coroutines ended with an exception
paulzakin commented 3 months ago

As an experiment, I removed the locks:

minute 0 = Both tasks are added minute 1 = check_periodic_again should be added (because of no lock) but zero database activity occurs minute 2 = check_periodic finishes - attempts to add a new task to check_periodic (works) and check_periodic_again (works).

onlyann commented 3 months ago

@paulzakin can you please also share how you are starting the worker?

paulzakin commented 3 months ago

Yup, via the Django command python manage.py procrastinate worker on procrastinate==2.11.0

ewjoachim commented 3 months ago

It's possible that the merging of https://github.com/procrastinate-org/procrastinate/pull/1160 fixed the side effect that made this issue visible, but we should fix it correctly (fix the real issue, cf https://discord.com/channels/1197292025725329549/1197292027298201693/1273315407331201031)