dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Distributed scheduler failure case #5453

Open jmichel-otb opened 2 years ago

jmichel-otb commented 2 years ago

What happened:

This is a (very) simplified version of distributed scheduler failure that happens in CARS. Though the memory needed to compute the task graph is high and the memory resources are limited, there is a task order that guarantees the success of computation, but it is not found by the scheduler.

Minimal Complete Verifiable Example:

from typing import List
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from time import sleep
import numpy as np
dask.__version__
'2021.09.1'

First, we start a distributed cluster with 2 workers, each with single thread and 400 Mo of RAM.

cluster = PBSCluster(project='scheduler-failure', n_workers=2, processes=1, cores=1, memory='400M', local_directory='$TMPDIR', log_directory = '$TMPDIR', walltime='00:30:00', interface='ib0')
client = Client(cluster)
cluster
Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

A() function generate numpy arrays of 90Mb

def A(mem:int=90):
    sleep(2)
    l = int((mem*10**6)/8)
    return np.random.rand((l))

B() function sums all the array it receives and all the elements in it. It is only meant to represent a function that consume all the input data and output a single value (reduce type).

def B(data:List[np.ndarray]):
    sleep(2)
    return sum(data).sum()

We generate 20 delayed A() tasks, and 5 B() tasks, each consuming 4 differents A() tasks. The final B() call is for graph visualization purpose only.

nb_A_tasks = 20
nb_A_tasks_by_B_tasks = 4
overlap=0
A_tasks = [dask.delayed(A)() for i in range(nb_A_tasks)]
B_tasks = [dask.delayed(B)(A_tasks[i:i+nb_A_tasks_by_B_tasks]) for i in range(0,nb_A_tasks, nb_A_tasks_by_B_tasks-overlap)]
C_tasks = dask.delayed(B)(B_tasks)

The task graph looks fine. The order is logical : consume 4 A() tasks and then depending B() task, before consuming next 4 A(). This order ensure maximal release of A() ressources.

C_tasks.visualize(rankdir='TB', color='order')

output_13_0

Though our cluster has very limited ressources, task ordering in above graph would ensure sucess of computation.

s = client.compute(C_tasks)

But this is not what happens : the scheduler will try to perform all A() tasks first, which can not be achieved because it represents 1800 Mo of total memory and our cluster only has 800 Mo. This is shown in the dashboard graph bellow:

all_A

As a results, workers will restart a few time after reaching the memory limit treshold, and after that the future will be marked with error status.

failure

The exception shows that the worker has been killed:

s.exception()
distributed.scheduler.KilledWorker('A-88b72ed9-4f92-437a-ba1f-e9242cd66984',
                                   <WorkerState 'tcp://10.135.36.49:34123', name: PBSCluster-0, memory: 0, processing: 7>)

What you expected to happen: Of course, the scheduler can not be aware of the amount of memory A() and B() will generate. We could use bind to try to influence the tasks order, at the expanse of parallelism, and this practically amount to handling tasks ordering by hand ... There are several things that could be improved:

Anything else we need to know?:

The CARS problem is actually more complex than that:

Environment:

zawadzl commented 2 years ago

@jmichel-otb thank you for this issue. I confirm I witnessed the very same puzzling behavior of dask in this case. Allowing users to provide some intel to the cost function would be a great addition for the remote sensing community! Cheers

fjetter commented 2 years ago

Thank for for the thorough report. This is unfortunately a known issue we sometimes refer to as "root task overproduction". While the task ordering (i.e. their priorities) is perfect, the cluster includes many other variables in deciding what to schedule when. At the most fundamental level, the problem arises since we only submit tasks to workers once we know the task can actually be computed, i.e. all dependencies are in memory. In your case, all A()s can be computed since they don't have a dependency. Therefore, the workers build a queue of many A. Once a batch finishes and a B is ready, it will be assigned to a worker. While this is happening, the worker already started with a new batch of As. B will not wait for this entire queue to be worked off, of course. it will cut in line but it will not abort currently executing As. Therefore, you'll compute at least threads_per_worker too many As before the first reduction job B starts.

There are principally two ways to fix this problem and many nuances of both flavours have been proposed over the time

The most promising fix so far is the second approach, see also https://github.com/dask/distributed/issues/3974. This is on our roadmap but I do not have an ETA for you, yet. However, we recently merged a pretty significant refactoring of our worker code which lays the groundwork for this https://github.com/dask/distributed/pull/5046