ayrat555 / fang

Background processing for Rust
https://fang.badykov.com/
MIT License
609 stars 25 forks source link

Duplicates are created for unique tasks when running fang on multiple processes #146

Open azhur opened 10 months ago

azhur commented 10 months ago

I was playing with async cron tasks from fang-examples.

I configured the task to be unique and to keep the history of executions.

After running the example on 2 parallel processes I noticed duplicated tasks in the DB and in the application logs.

insert_task_if_not_exist_query seems to not handle the case when multiple instances (processes) are running in parallel.

A possible solution to this is to do something like:

  1. Create a unique index like:
    CREATE UNIQUE INDEX fang_task_uniqueness_idx ON fang_tasks (uniq_hash, state) WHERE (uniq_hash is not null and state in ('new', 'in_progress', 'retried'));
  2. update insert_task_uniq.sql:
    INSERT INTO "fang_tasks" ("metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2 , $3, $4) ON CONFLICT DO NOTHING RETURNING *;

    WARNING: the above SQL statements were not tested and might have SQL errors or not be optimal, so should not be treated as a working solution before testing.

This probably doesn't make sense to do if fang is not designed to work on multiple processes. In this case, it is worth mentioning somewhere in the docs that fact (sorry if it is mentioned already and I missed it).

azhur commented 10 months ago

ah, just discovered that my solution won't work since fang can have a unique task in 'new' and 'in_progress' states at the same time so probably the WHERE clause of unique index for state column should be updated to ... and state = 'new'

v0idpwn commented 10 months ago

I feel like the conditional state index isn't a great approach, the race condition persists (although with a smaller window), as the first task may be picked up and changed to "in progress" before the second process tries to insert the second one, no?

v0idpwn commented 10 months ago

Perhaps unique could be resignified to "unique within period of time" instead of "unique within state"? 🤔 Maybe this would require significative changes.

azhur commented 10 months ago

How about using a PG advisory locks when the unique task is scheduled/inserted in the AsyncQueue::schedule_task, AsyncQueue::insert_task?

Not sure though if other databases have smth similar.

The https://github.com/ayrat555/fang/blob/0.10.4/fang/src/asynk/queries/find_task_by_uniq_hash.sql will need to be updated to include in_progress status as well. BTW any idea why in_progress is not included in that query?

ayrat555 commented 10 months ago

Hello. I'm sorry for the late reply. will take a look this week