dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Failure to pickle/unpickle on flight breaks the worker state machine #6705

Open crusaderky opened 1 year ago

crusaderky commented 1 year ago

Use case 1

A task in flight fails to unpickle when it lands; this triggers a GatherDepFailureEvent.

Use case 2

A task in flight unpickles successfully when it lands; this triggers a GatherDepSuccesfulEvent. The task is larger than 60% of memory_limit, so it's spilled immediately. However, it fails to pickle back.

Use case 3

Network stack raises, as shown in #6877 / #6875 This is a bug in the network stack and should be fixed there.

Expected behaviour

The task is marked as erred on the worker. All waiters are taken care of (rescheduled?) The scheduler is informed .

This is where things get complicated: the scheduler receives an error event for a task that is in memory somewhere else. Does it need to try fetching it again, incrementing the failure counter? What happens to the task's dependents when the scheduler finally transitions it from memory to error (a transition that today does not exist)?

Actual behaviour

If validate=True, the worker shuts itself down with @fail_hard:

            for ts_wait in ts.waiting_for_data:
                assert self.tasks[ts_wait.key] is ts_wait
>               assert ts_wait.state in WAITING_FOR_DATA, ts_wait
E               AssertionError: <TaskState 'x' error>

If validate=False, the worker sends {op: task-erred} to the scheduler. Unsure what happens next, considering that the task is in memory for the scheduler. This is untested.

6703 introduces tests for both use cases (xfailed):

Note that there are no integration tests with the scheduler - they need to be added.

fjetter commented 1 year ago

I believe this is a duplicate of https://github.com/dask/distributed/issues/4439

The task is larger than 60% of memory_limit, so it's spilled immediately. However, it fails to pickle back.

I don't understand what "fails to pickle back" means and how this connects to the flight state the unpickling error after deserialization would happen somewhere else. also, how would that be possible that it only sometimes fails to unpickle?

crusaderky commented 1 year ago

I don't understand what "fails to pickle back" means and how this connects to the flight state the unpickling error after deserialization would happen somewhere else.

The task is executed on worker A, then sent to worker B.

  1. It is pickled on worker A,
  2. transferred over the network,
  3. unpickled on worker B (by the network stack),
  4. and immediately pickled on worker B if it's larger than 60% memory_limit (by the SpillBuffer)

also, how would that be possible that it only sometimes fails to unpickle?

In the sense that it pickles/unpickles fine on A, but fails on B. This is a very common pain point on all libraries where the behaviour of pickle/unpickle of individual objects is altered by the global state - Pint (https://pint.readthedocs.io/en/stable/), to name one.