dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Dynamic adaption of average task duration #3516

Open gsailer opened 4 years ago

gsailer commented 4 years ago

We currently experience an issue with very long-running tasks, when using adaptive_target to figure out how many workers the dask graph needs. The estimation by adaptive_target is currently wrong as long as no task of the TaskPrefix has been executed yet. Could we change the estimation to correct dynamically and propagate a new average across the cluster as soon as the execution of one task e.g. already takes twice the estimated time?

gsailer commented 4 years ago

Do you see any chance in adding timing logic around the ThreadPoolExecutor on the worker and creating a new endpoint to the scheduler, which receives updated timing info for tasks and then propagates that to all other workers?

start_time = time.time()
future = executor.submit(function, *args, **kwargs)
pc = PeriodicCallback(lambda: self.threadpool_timing_callback(start_time), 1000)
pc.start()
def threadpool_timing_callback(self, start_time):
     if time.time() - start_time > some_boundary:
          update_average()
          notify_scheduler(task_key, new_average)

https://github.com/dask/distributed/blob/master/distributed/worker.py#L2309-L2319

mrocklin commented 4 years ago

Could we change the estimation to correct dynamically and propagate a new average across the cluster as soon as the execution of one task e.g. already takes twice the estimated time?

Sure. I think that we currently do something similar with work stealing. Maybe?

It is a little bit hard because we don't know when the first task of a prefix has actually started running, only when it has been sent to a worker.

Or perhaps this is the motivation for your second comment, about including some signal in the worker that talks back to the scheduler once a task has started processing? Was this was you meant?

mrocklin commented 4 years ago

If so, the concern here is that we're increasing the communication by a non-trivial amount. Sending a new message for every task is doable, but does add non-trivial cost.

gsailer commented 4 years ago

Or perhaps this is the motivation for your second comment, about including some signal in the worker that talks back to the scheduler once a task has started processing? Was this was you meant?

Yes, the timing around the executor would be a way to solve the problem with the information about when the tasks start running.

Assuming that task duration in one TaskPrefix is roughly normally distributed I think the amount of additional messages could be limited to: O(prefixes * task duration corrections * workers) by only sending messages when the currently executing task is over the boundary, which would adapt dynamically with an exponential backoff and then propagate the new expected duration to all other workers via the scheduler.

I will have to look into how the work stealing is done, but in general this problem seems pretty complex to me and I would be very happy about help with the general design and integration of this as I'm not experienced with distributed systems, the design approaches and the pitfalls that come with it.

mrocklin commented 4 years ago

Yes, I agree that this seems complicated. It also isn't a common issue (although I acknowledge that it is a serious issue when it is an issue). Because it is not so common, any solution would need to not add much code complexity, especially not in the common case.

I would be very happy about help with the general design and integration of this as I'm not experienced with distributed systems

In principle I would love to be able to offer this help, but my maintainer time is limited