Barski-lab / cwl-airflow

Python package to extend Airflow functionality with CWL1.1 support
https://barski-lab.github.io/cwl-airflow
Apache License 2.0
185 stars 32 forks source link

Draft: Do not clear xcom if any step was failed #81

Closed mpolykovskiy closed 2 years ago

mpolykovskiy commented 2 years ago

@michael-kotliar it's just my suggestion, how we can rerun failed stages. The root problem is, that clean_up drops xcom data, so cwl cannot restore work folders. So if we don't clear xcom, step can be rerun. But this decision has a (possibly) negative effect - temporary data from failed steps will accumulate and occupy disk space. I think it's ok, but I could be wrong.

portah commented 2 years ago

I like this update, so now we can keep tmp_directory in next situations:

  1. if keep_tmp_data is set for all runs
  2. if keep_tmp_data not set but a run has failed

Maybe would be great still have a way to do not save at all

Like default behavior if keep_tmp_data is not set to True/False than we can use 2 but if keep_tmp_data is set to False then clean up?

Is it good idea? or I overcomplicate things?

michael-kotliar commented 2 years ago

@mpolykovskiy Thanks for your PR!

I really like the idea of keeping temporary data when the workflow has failed steps. I also agree with Andrey about having a way to force CWL-Airflow to remove temporary data despite failed or not failed steps.

Currently, if keep_tmp_data is not set in airflow.cfg we assume it is False, so we can't distinguish the "real" False and "default" False. So, in addition to the changes provided in your pull request, I recommend few more changes:

  1. Use CWL_KEEP_TMP_DATA = None instead of https://github.com/Barski-lab/cwl-airflow/blob/ca14232bbb78df242c841047318bcb67c775379f/cwl_airflow/utilities/cwl.py#L72
  2. Use False (to prevent converting our None to boolean) instead of https://github.com/Barski-lab/cwl-airflow/blob/ca14232bbb78df242c841047318bcb67c775379f/cwl_airflow/utilities/cwl.py#L428

Then your clean_up function will look like this

def clean_up(context):
    try:
        default_cwl_args = get_default_cwl_args(
            context["dag"].default_args["cwl"]
        )
        dag_run = context["dag_run"]
        if (default_cwl_args["keep_tmp_data"] == False) or (default_cwl_args["keep_tmp_data"] is None and not has_failed_tasks(dag_run)):
            remove_dag_run_tmp_data(dag_run)          # safe to run as it has its own exception handling
            for ti in dag_run.get_task_instances():
                ti.clear_xcom_data()
    except KeyError as err:                           # will catch if called from clean_dag_run
        logging.info(f"Failed to clean up data for current DAG, due to \n {err}")

Let me know if these changes correspond to the logic you want to achieve in this PR

Michael

mpolykovskiy commented 2 years ago

@michael-kotliar thank you for the feedback. I implemented your idea (and simplified a bit), please check that I understood you.

michael-kotliar commented 2 years ago

Awesome! Thanks!