procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
867 stars 55 forks source link

Having one job depend on another (or several). #1198

Open nikita-krokosh opened 1 month ago

nikita-krokosh commented 1 month ago

Hi. Let's say I need job to run only if 2 others (created before) are completed successfully, AFAIU there's no support for this.

Can I extend a procrastinate job queue table myself to add array field column with required jobs ID list and I could check if all of them are finished inside my task manually or with some decorator? In other words, is having extra columns supported and won't break standard flows?

ewjoachim commented 1 month ago

AFAIU there's no support for this

I think you're right

is having extra columns supported and won't break standard flows?

I don't think it's a good idea, but I can't tell you exacty what will fail. I think you should rather do this on a different table, but you can add a foreign key as long as you handle the cases where the job will be deleted (so ON DELETE CASCADE I guess)

onlyann commented 1 month ago

Assuming a task C needs to run when task A and B complete, introduce an orchestrator task O that defers task A and B, waits for both of them to complete and then defers task C.

Would that work in your scenario?

ewjoachim commented 1 month ago

Hm, I'm not sure the "Orchestrator task" is a concept I like. If the worker running it crashes, the whole set of task will be completely broken.

onlyann commented 1 month ago

It is not foolproof but might be good enough. A variant of this (still imperfect) is to defer A and B, and then defer the task O that receives the job ids it should wait for. Task O waits for those job ids to complete before deferring task C.

This way, the task O can be retried if the worker crashes.

TkTech commented 1 month ago

So the way Chancy does this seems to work (1b+ workflows so far), and looking at the model for Procastinate I don't see why something similar wouldn't be an option if there's no issue with adding more tables. It can be done in a way that avoids modifying the existing jobs and instead just builds on top of them.

Two tables are used, the first to track the workflows themselves:

CREATE TABLE {workflows} (
                    id UUID PRIMARY KEY,
                    name TEXT NOT NULL,
                    state TEXT NOT NULL,
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    updated_at TIMESTAMPTZ DEFAULT NOW()
                )

And another to track each step inside of a workflow, which will be updated with the ID of the job once it's started (job_id becomes a bigserial for procastinate, state becomes status, etc):

CREATE TABLE {workflow_steps} (
                    id SERIAL PRIMARY KEY,
                    workflow_id UUID REFERENCES {workflows}(id)
                        ON DELETE CASCADE,
                    step_id TEXT NOT NULL,
                    job_data JSON NOT NULL,
                    dependencies JSON NOT NULL,
                    job_id UUID,
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    updated_at TIMESTAMPTZ DEFAULT NOW()
                )

Periodically, incomplete workflows are picked up and processed:

while await self.sleep(self.polling_interval):
            await self.wait_for_leader(worker)
            workflows = await self.fetch_workflows(
                chancy,
                states=["pending", "running"],
                limit=self.max_workflows_per_run,
            )
            for workflow in workflows:
                await self.process_workflow(chancy, workflow)

Fetching the workflows and their steps can be done in a single quick query thanks to json_build_object:

SELECT 
                            w.id, 
                            w.name, 
                            w.state,
                            w.created_at,
                            w.updated_at,
                            COALESCE(json_agg(
                                json_build_object(
                                    'step_id', ws.step_id,
                                    'job_data', ws.job_data,
                                    'dependencies', ws.dependencies,
                                    'state', j.state,
                                    'job_id', ws.job_id
                                )
                            ), '[]'::json) as steps
                        FROM {workflows} w
                        LEFT JOIN {workflow_steps} ws ON w.id = ws.workflow_id
                        LEFT JOIN {jobs} j ON ws.job_id = j.id
                        WHERE (
                            %(states)s::text[] IS NULL OR
                            w.state = ANY(%(states)s::text[])
                        ) AND (
                            %(ids)s::uuid[] IS NULL OR
                            w.id = ANY(%(ids)s::uuid[])
                        )
                        GROUP BY w.id, w.name, w.state
                        LIMIT {limit}

And then the process to progress a workflow becomes trivial, ~15 lines, https://github.com/TkTech/chancy/blob/main/chancy/plugins/workflow/__init__.py#L340. job_data on each step contains the serialized job to be inserted into the job table like any normal job after all the dependencies are met.

Since procastinate doesn't have a leadership node, we'd add a locked_at column to the workflow table and do a SELECT...FOR UPDATE with a timeout and let every worker take a shot at periodically progressing workflows.

This way job's don't know they are part of a workflow, no persistent job is needed, just a periodic one, and the only relationship between the two is the job ID and state. Since this implements DAG-based workflows, it becomes easy to re-implement Celery's Chain and Group as well - 6 lines.

nikita-krokosh commented 1 month ago

Thanks for all the answers folks. I'll try to to investigate more and come up with something. I was thinking about celery too, but as far as I understand there's also no easy way to make jobs scheduled from different places: