rwth-i6 / sisyphus

A Workflow Manager in Python
Mozilla Public License 2.0
45 stars 24 forks source link

Allow hooking job failure for generic error handling #205

Closed NeoLegends closed 1 month ago

NeoLegends commented 2 months ago

Closes #179 Closes #204

now testing this

NeoLegends commented 1 month ago

I tested this and it works well. Do you think it's worthwhile adding default handlers that check for e.g. certain log file substrings and then automatically clear the error if they are present?

NeoLegends commented 1 month ago

As a first approximation of a proper error handling implementation for the CUDA errors I'm encountering at the momentadding this snippet to settings.py works reasonably well:

ignore_error_cache = set()

def on_job_failure(job: "Job"):
    from i6_core.returnn import ReturnnTrainingJob

    if not isinstance(job, ReturnnTrainingJob):
        logging.debug(f"{job.job_id()}: error, but not a {ReturnnTrainingJob.__name__}, so not doing anything.")
        return
    elif job.job_id() in ignore_error_cache:
        return

    log_file_path = os.path.join(job.work_path(), "../log.run.1")
    with open(log_file_path, "rt") as log_file:
        is_cuda_err = any("cuda error" in line.lower() for line in log_file)

    if not is_cuda_err:
        logging.debug(f"{job.job_id()}: died but probably not due to a CUDA error, better go check by hand.")
        ignore_error_cache.add(job.job_id())
        return

    logging.info(f"{job.job_id()}: CUDA 💥, re-starting... 🔁")

    # archive log file
    i = 1
    cleared_log_path = None
    while cleared_log_path is None or os.path.exists(cleared_log_path):
        cleared_log_path = os.path.join(job.work_path(), f"../log.run.cuda-cleared.{i:04}.gz")
        i += 1
    with open(log_file_path, "rb") as log_in, gzip.open(cleared_log_path, "wb") as log_out:
        shutil.copyfileobj(log_in, log_out)
    os.remove(log_file_path)

    # re-schedule job
    for f in [
        os.path.join(job.work_path(), "../error.run.1"),
        os.path.join(job.work_path(), "../submit_log.run"),
    ]:
        try:
            os.remove(f)
        except FileNotFoundError:
            pass
albertz commented 3 weeks ago

I wonder, this callback is called all the time, not once on job failure? This is a bit unexpected to me. And also makes the logic much more complicated on the user side. E.g. you need to add this ignore_error_cache logic here. Which is also wrong, because once the user clears the error for this job, and it continues to run, it might later run into a CUDA error, but then you would ignore it, because you never clear the ignore_error_cache here.

albertz commented 3 weeks ago

Well, ok, checking the mtime of the error file probably should be better, if you want to keep the callback logic this way. Like:


ignore_error_cache = {}  # job_id -> err_mtime

# https://github.com/rwth-i6/sisyphus/pull/205#issuecomment-2368527715
def on_job_failure(job: Job):
    import logging
    import gzip
    from i6_core.returnn import ReturnnTrainingJob

    if not isinstance(job, ReturnnTrainingJob):
        return

    try:
        err_mtime = os.path.getmtime(os.path.join(job.work_path(), "../error.run.1"))
    except FileNotFoundError:
        return  # maybe was already cleared
    if ignore_error_cache.get(job.job_id()) == err_mtime:
        return

    log_file_path = os.path.join(job.work_path(), "../log.run.1")
    with open(log_file_path, "rt") as log_file:
        is_cuda_err = any(("cuda error" in line.lower() or "cuFFT error" in line) for line in log_file)

    if not is_cuda_err:
        logging.debug(f"{job.job_id()}: died but probably not due to a CUDA error, better go check by hand.")
        ignore_error_cache[job.job_id()] = err_mtime
        return

    ...