allegroai / clearml

ClearML - Auto-Magical CI/CD to streamline your AI workload. Experiment Management, Data Management, Pipeline, Orchestration, Scheduling & Serving in one MLOps/LLMOps solution
https://clear.ml/docs
Apache License 2.0
5.69k stars 655 forks source link

Stop the pipeline when task is aborted #1162

Open chmjelek opened 12 months ago

chmjelek commented 12 months ago

Hello, I would like to ask you how to properly stop the pipeline when task is manually aborted.

My pipeline:

pipe = PipelineController(name="name", project="project_name")

pipe.add_step(name="step_1", base_task_project="base_project", base_task_name="step one", post_execute_callback=post_call)

pipe.add_step(name="step_2", base_task_project="base_project", base_task_name="step two")

I would like to stop the pipeline after first step and mark it as "aborted".

First task is stopped with:

task.mark_stopped(force=True, status_message="stop reason")

I thought to stop the pipeline using post_execute_callback parameter.

def post_call(_pipeline, _node):
      if _node.status == "aborted":
          _pipeline.stop(mark_aborted=True)
      else:
          pass

The pipeline actually stops but I am getting exception and error:

Exception in thread Thread-7

RuntimeError: cannot join current thread

Why am I getting such errors? Is there a better way of stopping pipeline?