dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Semaphore and other objects as task condition #4104

Open pvanderlinden opened 4 years ago

pvanderlinden commented 4 years ago

When you are using the builtin Semaphore to limit task concurrency of a certain type, there are currently two behaviours you can choose:

Unless there is a different option, the only solution would be to make it possible to add a semaphore or similar primitives to be added as a condition to a task (like the result of another task is a condition before the task can be scheduled).

pvanderlinden commented 4 years ago

This actually seems to happen more then it doesn't. New tasks are getting scheduled on the workers which don't do anything but have tasks waiting for Semaphores. While there are 6 workers, only 2 workers will do anything because of this issue. The Semaphore will also prevent task priorities to work properly (as they might get scheduled first, but might not get the lease first).

Is there any advice on how to get this working while limiting certain types of tasks?

pvanderlinden commented 4 years ago

Although I'm not sure this is the full story. I can also see tasks which are marked as ready, and don't get run at all, while only 2 tasks are actually running with 6 workers. I'm not sure why this is?

pvanderlinden commented 4 years ago

test3

An example of a timeline:

This means there are long periods of time where other tasks are not started while there are plenty of workers and plenty of "semaphore slots" available.

Some remarks:

I think my issue is two fold:

pvanderlinden commented 4 years ago

@mrocklin @fjetter any thoughts on the above issues with Semaphores?

fjetter commented 4 years ago

First things first, the scheduler is actually agnostic about the semaphore values and the task transitioning and work assignment doesn't know about this, neither does the work stealing or any other internal administration. So there is definitively a lot of potential for improved scheduling if the synchronization objects were implemented as task annotations or conditions. Effectively only transitioning as many tasks to processing as are allowed by the semaphore value. From what I understand, that would require a vastly different implementation and I don't feel confident at the moment to suggest what would be required to enable this. I think before this can be enabled there is a lot of ground work needed to be done up front.

Regarding your example, I can't really tell what's going on. Just looking at the dashboard I would assume something is broken since during the first period of the long yellow task, something else should be able to compute. Generally, the task progress view should show a task which is blocked by a lock/semaphore as "computing". I cannot really tell how the system behaves if seceding comes into play. I could imagine a few scenarios where we cause deadlocks of some sort to appear, or at the very least causing congestions. I also don't know how seceding would show up on that graph. Do you observer the same without seceding?

pvanderlinden commented 4 years ago

@fjetter I'm using the Semaphore's to limit the usage of an SQL usage and other resources (not per worker resources, otherwise I would use the builtin resource feature). The issue without seceding is that I would not process things in parallel much as all workers will be busy working on waiting for the semaphore. The scenario: I schedule 60+ tasks with the same Semaphore, each of those tasks then have follow up tasks, with or without their own Semaphore. If I would not secede, all workers will just process a task waiting on a Semaphore. I can not just deploy more workers, as it would be a big waste (the tasks are large, so I would need 60+ workers with several GB's of memory just to have a parrellization of about 5-6 tasks).

My graph is actually not the the dashboard, I can't really get a decent dashboard output for this (the full graph takes several hours). But what basically happens: there are many tasks running which are seceded and trying to acquire a semaphore on multiple workers. When a task completes, the random tasks get's it semaphore. Then that tasks tries to rejoin, but if the task got the semaphore happens to be on a busy worker, it won't do anything until the other task finished.

Basically I'm trying to use the semaphore to limit the amount of tasks of a certain type running at the same time. But I don't see a way to do this correctly with the current Semaphore's or other feature?

pvanderlinden commented 4 years ago

Also I do understand that the scheduler is agnostic about the Semaphore, which is basically what is causing the issue. I can't see a way of using the Semaphore's without having the scheduler being aware of the Semaphore's.

I'm also trying to find a work around, as this is a big issue.

fjetter commented 4 years ago

The thing about seceding is that on a given worker multiple threads may secede and while doing so acquiring a lease. If only one of them can rejoin, this effectively leads to resource starvation which is what might be happening in the case of these very heavy yellow computations.

I don't have much experience with the seceding approach. I actually have no idea about how it is implemented, but from a conceptional point of view, if we could timeout the rejoin and would release the semaphore if the rejoin did not succeed, this might free up this deadlock situation. I can't say if this is feasible or possible, though.

Taking a step back, A rather practical approach I would suggest is to fuse the tasks into one computation. This ensures that Green+Black+Yellow is always executed on the same worker, always immediately after the previous one finishes. If I understood the problem correctly, this might help and would be less complex to implement.

pvanderlinden commented 4 years ago

The main reason (green-black-yellow) are separate tasks is because they have way different resource requirements and more black and yellow can run at the same time (then green). The second issue is that some yellow tasks are going to depend on black tasks (other then their "own" black task).

pvanderlinden commented 4 years ago

The other work around would be (not great though), to not secede, and use worker resources. That way I can limit which workers run which tasks. It is not a great work around, but it might solve my issue till the Semaphores & Scheduling work together. I need to test if this works, and also it makes the workers non generic. The other way might be to build more like a queuing system, like the feature Dask had before. It would make it difficult to build the graph though.

pvanderlinden commented 4 years ago

@fjetter There is basically no way of limiting concurrency of tasks correctly as I understand it now?

fjetter commented 4 years ago

no way of limiting concurrency of tasks correctly as I understand it now?

None of the synchronization objects (Events, Locks, Semaphores) are designed to limit the concurrency of tasks but rather about limiting concurrent code execution of code within a task. I actually believe there are valid applications for both but what you are requiring is definitely out of scope of the current implementation.


One additional thing you might consider trying to mitigate the "worker is blocked" issue, is the Reschedule exception.

something like

from distributed import Reschedule
def func(sem):
    try:
        sem.acquire(timeout="10s")  # Or however much you're willing to sacrifice
    except TimeoutError:
        raise Reschedule
    try:
        # do stuff to your DB
    finally:
        sem.release()

This way a task will only wait and block the worker pool for a limited amount of time and is scheduled later on again. Not sure how robust the rescheduling actually is but conceptionally this would avoid the "Multiple threads want to rejoin" issue we discussed above.

pvanderlinden commented 4 years ago

Thanks, I will experiment with the reschedule mechanism.

pvanderlinden commented 4 years ago

Fixed an issue in the code:

from distributed import Reschedule
def func(sem):
    if not sem.acquire(timeout="1s")  # Or however much you're willing to sacrifice
        raise Reschedule
    try:
        # do stuff to your DB
    finally:
        sem.release()

@fjetter Unfortunately this doesn't work. It still keeps the workers busy waiting on Semaphores while "real" tasks are still waiting for a worker.

pvanderlinden commented 4 years ago

Is it possible to modify the rejoin function to be able to timeout? This way it would work like this:

pvanderlinden commented 4 years ago

I thought I found a solution, but this makes the issue even worse, and I don't understand why. I would expect the submitted task to be scheduled on a different worker. and the worker "acquiring and releasing the semaphore" secedes, and is "free" to pick up tasks. Instead the functions submitted by this code don't always get scheduled while there are free workers available. Does this mean there is an even bigger scheduling issue, or am I missing something?

def limit_concurrency(name, max_running=1, func=None):
    if func is None:
        return functools.partial(limit_concurrency, name, max_running)

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        with dd.worker_client(separate_thread=True) as client:
            sem = dd.Semaphore(name=name, max_leases=max_running)
            with sem:
                future = client.submit(apply, func, args, kwargs)
                return future.result()

    return wrapped
lr4d commented 4 years ago

I schedule 60+ tasks with the same Semaphore, each of those tasks then have follow up tasks, with or without their own Semaphore. If I would not secede, all workers will just process a task waiting on a Semaphore

Have you looked into fusing? Depending on the fusing parameters, you will be executing some of the (logical) tasks after the task involving a semaphore as part of a single (actual) task.

You could also look into using futures.

pvanderlinden commented 4 years ago

@lr4d Fusing would only complicate things. They are split because they have completely different scheduling requirements. And it will make it impossible to describe dependencies.

What do you mean with using futures? I'm scheduling tasks like that in the example. But for some reason Dask doesn't schedule the tasks on the 5 idle workers.

pvanderlinden commented 4 years ago

The main reason to fuse, is if you have many very small tasks which take more time to schedule then to compute. The graph I'm talking about has about 8 hours in 180 tasks on average, which is several minutes per task.

fjetter commented 4 years ago

I thought about the rescheduling again and realised this could never work. The task priorities would still enforce the same tasks to be rescheduled over and over again. Have you considered giving the follow up tasks higher priorities (maybe in combination w/ rescheduling)? Different priorities, of course, could then just flip the problem, s.t. the follow up tasks block. Maybe there is some sweet spot when putting application knowledge into it.

Apart from this, I don't see any more space. What you are asking for is something where we require task annotations and this is something currently in design, see #6701. Once these annotations arrive we can revisit the semaphore implementation but for now this is all we got.

s it possible to modify the rejoin function to be able to timeout?

Not sure if this helps or if it is possible. I'm actually not even sure why the rejoin blocks in the first place. To my understanding the seceded thread should not stop working but just work on the task it had assigned. Therefore, you should have more than one task on a given machine running.

pvanderlinden commented 4 years ago

Rejoin is ensuring you don't run multiple tasks on a single worker.

Do you have any idea why submitting a task from a task won't schedule it on any of the idle workers, but instead puts them almost all on a single worker, making the issue even worse? @fjetter

lr4d commented 4 years ago

I'm scheduling tasks like that in the example

My bad, didn't go through the whole thread.

What do you mean with using futures?

I see you're already doing this, my idea was something like the below. (Mostly pseudocode, don't expect this to work). e.g.

data = [ ... ] # list of futures
max_leases = n

processed_data = [client.submit(semaphore_using_fn, data.pop(), semaphore=semaphore) for i in range(max_leases)]

# Start running whatever tasks on `processed_data`
...

while data:
   if semaphore.get_value():  # leases available
      processed_data.append(client.submit(semaphore_using_function, data.pop(), semaphore=semaphore))
      # Again, you can schedule a future for the next batch of `processed_data`
      ...
      time.sleep(0.1) # don't choke scheduler
pvanderlinden commented 4 years ago

@lr4d I have two versions trying to limit concurrencies of certain tasks:

production:

def limit_concurrency(name, max_running=1, func=None):
    if func is None:
        return functools.partial(limit_concurrency, name, max_running)

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        sem = dd.Semaphore(name=name, max_leases=max_running)
        dd.secede()
        with sem:
            dd.rejoin()
            return func(*args, **kwargs)
    return wrapped

Which has the issue of giving the Semaphore some of the time to a worker which is busy, making it impossible to rejoin, and not really distributing the work.

Attempt 2:

def limit_concurrency(name, max_running=1, func=None):
    if func is None:
        return functools.partial(limit_concurrency, name, max_running)

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        with dd.worker_client(separate_thread=True) as client:
            sem = dd.Semaphore(name=name, max_leases=max_running)
            with sem:
                future = client.submit(apply, func, args, kwargs)
                return future.result()

    return wrapped

This version seems to trigger a bug in the scheduler, as 4/5 workers are not doing any work most of the time, while there are available tasks ready to run (but submitted by this task). This also makes me unsure about my other usage of the worker_client, won't that also schedule tasks on mostly one worker? I can try to make an isolated example if that helps.

If I look at your example, I have 2 questions: Wouldn't it also produce the same issue where it schedules tasks on mostly one worker? Secondly it makes submitting the follow up tasks difficult, as I have to check for each tasks if the futures belonging to it's dependencies exists already (use multithreading, queues, etc).

lr4d commented 4 years ago

@pvanderlinden As @fjetter states earlier, there is no supported way to accommodate this use case atm. The code example I shared was just an idea that came to my mind, due to issues with state and parallel operations, I wouldn't expect it to work "properly" either.

pvanderlinden commented 4 years ago

@lr4d Do you have any idea why the client.submit only/mostly seems to submit tasks to the current worker only?. I literally have 4/5 workers doing nothing (only running deceded tasks) and a long backlog of tasks on one worker. Do I have to give it some hint to schedule it somewhere else?