dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

UnboundLocalError when collecting results #8514

Open Dominic-Stafford opened 8 months ago

Dominic-Stafford commented 8 months ago

Describe the issue:

We are using distributed to run a process on HTCondor. Our cluster is somewhat unreliable, and hence our jobs will occasionally die for reasons we can't determine. We therefore have created a set-up that will resubmit jobs which fail. However sometimes we get the following error in our process:

Traceback (most recent call last):
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/htcondor.py", line 265, in _dask_map
    result = task.result(timeout=1)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/client.py", line 322, in result
    return self.client.sync(self._result, callback_timeout=timeout)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/utils.py", line 436, in sync
    return result
UnboundLocalError: local variable 'result' referenced before assignment

This gets repeated many times, and stops the overall manager process. I think the root cause may be an issue with our cluster (since it is only intermittent), but could you please look into this UnboundLocalError, which I think is spurious?

Example:

Unfortunately due to the intermittent nature of the issue I haven't managed to create a minimal reproducible example. This is the event loop which we use to gather the results.

        tasks = client.map(function, *iterables, pure=False, key=key)
        tasks_to_itemidx = dict(zip(tasks, range(len(tasks))))
        tasks = dask.distributed.as_completed(tasks)
        task_failures = {}
        for task in tasks:
            try:
                # This call should return immediately but sometimes Dask gets
                # stuck here. Unknown why. Specify timeout to circumvent.
                result = task.result(timeout=1)
            except asyncio.exceptions.TimeoutError:
                # Retry but silence the error
                tasks.add(self._dask_resubmit_failed_task(
                    function, task, tasks_to_itemidx, iterables, key))
            except Exception as e:
                logger.exception(e)
                failures = task_failures.get(task, 0)
                if self.retries is not None and failures >= self.retries:
                    raise
                logger.info(
                    f"Task failed {failures} times and will be retried")

                new_task = self._dask_resubmit_failed_task(
                    function, task, tasks_to_itemidx, iterables, key)
                task_failures[new_task] = failures + 1
            else:
                if result is None:
                    logger.error("Task returned 'None' (usually due to dask "
                                 "killing this worker).")
                    failures = task_failures.get(task, 0)
                    if self.retries is not None and failures >= self.retries:
                        raise RuntimeError(
                            "Number of retries was exceed by a task returning "
                            "'None'. This is usually due to dask killing a "
                            "worker for exceeding memory usage.")
                    logger.info(
                        f"Task failed {failures} times and will be retried")

                    new_task = self._dask_resubmit_failed_task(
                        function, task, tasks_to_itemidx, iterables, key)
                    task.cancel()
                    task_failures[new_task] = failures + 1
                else:
                    yield result
            del tasks_to_itemidx[task]
            if task in task_failures:
                del task_failures[task]

Environment:

fjetter commented 8 months ago

I could see this happening if the task (or anything in https://github.com/dask/distributed/blob/b03efeeda5f17c48d020f4820ff89a752beeaf89/distributed/utils.py#L401-L408) is raising a BaseException. I don't know what _dask_resubmit_failed_task is doing but if this is cancelling the task, this could explain it.

Dominic-Stafford commented 8 months ago

Hi @fjetter, thank you for the quick response. I changed https://github.com/dask/distributed/blob/c98cd09f2a6d4c9a7180deb9348c71477121f3da/distributed/utils.py#L409 to catch also base exceptions, and the error became:

Traceback (most recent call last):
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/runproc.py", line 321, in <module>
    run_processor()
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/runproc.py", line 310, in run_processor
    output = runner(datasets, "Events", processor)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/coffea/processor/executor.py", line 1700, in __call__
    wrapped_out = self.run(fileset, processor_instance, treename)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/coffea/processor/executor.py", line 1782, in run
    chunks = self.preprocess(fileset, treename)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/coffea/processor/executor.py", line 1734, in preprocess
    self._preprocess_fileset(fileset)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/coffea/processor/executor.py", line 1461, in _preprocess_fileset
    out, _ = pre_executor(to_get, closure, out)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/executor.py", line 227, in __call__
    result = next(gen)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/executor.py", line 313, in _submit
    yield from self.cluster.process(function, items, key=tasknames)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/htcondor.py", line 339, in process
    yield from self._dask_map(function, *iterables, key=key)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/htcondor.py", line 265, in _dask_map
    result = task.result(timeout=1)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/client.py", line 322, in result
    return self.client.sync(self._result, callback_timeout=timeout)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/utils.py", line 408, in f
    result = yield future
GeneratorExit

So it seems you are correct on this, however I'm not really sure why this GeneratorExit is being raised. This is our code for _dask_resubmit_failed_task - it doesn't cancel the old task, but is it doing something else dangerous?

    def _dask_resubmit_failed_task(
            self, function, task, tasks_to_itemidx, iterables, key):
        idx = tasks_to_itemidx[task]
        item = (list(args)[idx] for args in iterables)
        if key is not None:
            key = key[idx] + "-retry-" + str(uuid.uuid4())
        new_task = self.client.submit(function, *item, pure=False, key=key)
        tasks_to_itemidx[new_task] = idx
        return new_task