allegroai / clearml

ClearML - Auto-Magical CI/CD to streamline your AI workload. Experiment Management, Data Management, Pipeline, Orchestration, Scheduling & Serving in one MLOps/LLMOps solution
https://clear.ml/docs
Apache License 2.0
5.61k stars 651 forks source link

feat(scheduler): :sparkles: Adds task_id_function to TaskScheduler #1212

Open natephysics opened 7 months ago

natephysics commented 7 months ago

Related Issue \ discussion

https://github.com/allegroai/clearml/issues/1211 https://clearml.slack.com/archives/CTK20V944/p1708447659999379?thread_ts=1708445057.172119&cid=CTK20V944

Patch Description

Adds new functionality to the task scheduler by adding a new parameter task_id_function to TaskScheduler.add_task() that takes a callable that has an expected return of a task_id. This task_id_function function is run at runtime (when the task scheduler would normally execute the scheduled task) and uses the task_id returned by the function + the other parameters from .add_task() as the scheduled task.

Motivation

Why is this useful: there's a host of reasons but the biggest one: it gives users much more control over the tasks that are run by the task scheduler. Currently, as far as I can tell, if I wanted to run the most recent task (at runtime) from a given project with a specific tag, it's not possible to do with the task scheduler. I can use the schedule_function parameter and create a function that finds and runs the task but then I lose the core advantages of .add_task(), no way to specify queues, task_parameters, and task_overrides. Naturally, I could wrap all of that into the function called by task_parameters but then I'm basically just writing my own scheduler at that point. This will also let you do some preprocessing before returning the task_id, for example, if you wanted to clean up old tasks.

Testing Instructions

Define a function that returns a task_id by whatever means:

def yourfunction():
    return 'some_task_id_string'

With the task scheduler call .add_task(..., schedule_task_id=None, task_id_function=yourfunction, ...). At the time the task was scheduled to execute, the function should run and the output task_id will be passed as if it had originally passed to schedule_task_id.

jkhenning commented 7 months ago

Thanks @natephysics, we'll take a look at it soon!

eugen-ajechiloae-clearml commented 7 months ago

Also, can you please update https://github.com/allegroai/clearml/blob/master/examples/scheduler/cron_example.py to include a use of base_task_id_function?

jkhenning commented 6 months ago

@natephysics any update?

natephysics commented 6 months ago

Hi Jake.

Yes. I started implementing the suggested changes but the point about not running the function multiple times was something I overlooked. I fixed most of the issues and in theory the function should only run once but when I test it out it appears to run twice still. I tried to figure out exactly why this is the case but I ran out of time. I still plan to finish it but I have to finish up another project first.

I was thinking of splitting out the preprocessing and the function that returns a task ID in the worst case scenario. But I'd rather solve the problem directly if I could.