dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Dask coroutines #1663

Open mrocklin opened 6 years ago

mrocklin commented 6 years ago

There are currently a few ways to construct highly dynamic workloads, where the graph can change during computation. This includes operations like get_client(), calling dask.compute within a task, using futures and as_completed, and so on.

Sometimes these workloads can grow complex and difficult to reason about (see https://github.com/dask/distributed/issues/1424). Are there better programming interfaces to present to users that still cover the same options, but perhaps guide users to correct behavior.

Asynchronous projects like Tornado/Asyncio/Curio/Trio seem to prefer a coroutine-style approach. Is this a possible option for a distributed runtime like Dask? If so what would it look like?

Here are a couple of toy problems that come up frequently and naive thoughts on how they might look as coroutines

Fibonacci

@dask.coroutine
def fib(i):
    if i < 2:
        return i
    else:
        a, b = yield [fib(i - 1), fib(i - 2)]
        return a + b

Evaluating on a remote list of unknown size

@dask.coroutine
def generate_data():
    return list(range(random.randint(0, 10)))  # a list of data of unknown length

@dask.coroutine
def inc(x):
    return x + 1

@dask.coroutine
def my_len(L):
    return len(L)

@dask.coroutine
def my_sum(L):
    return sum(L)

@dask.coroutine
def process_all():
    L = generate_data()
    n = yield my_len(L)
    processed = [inc(L[i]) for i in range(n)]
    total = yield sum(processed)
    return total

There are problems with both examples. They also don't represent the full space of complexity that existing solutions can cover. Broad thoughts on this topic are welcome.

cc @ogrisel @pitrou @remram44 @adamklein

asford commented 6 years ago

edit On rereading, it seems like this idea belongs in #1424. Happy to move it there if you all would prefer. edit Clarify use of worker_client rather than get_client.

TLDR

Introducing promises into the dask scheduler, akin to c++'s promise/future interface, could provide the ability to "rewrite" the task graph below a promise and would be an extremely useful feature when scaling with dynamic tasks.

I've run into a potentially related issue while implementing a CPU-bound distributed scatter-gather style operation in dask using the get_client worker_client feature. I hope the write up below is useful, though it isn't directly related to the coroutine feature you've proposed.

The workload I have is relatively similar to the "list of unknown size problem" you've described above, in that a single "logical task" involves:

  1. Performing a cpu-bound operation to identify zero-to-thousands of "subtasks". (len)
  2. Performing another cpu-bound operation on each subtask. (inc)
  3. Aggregating subtask results via another cpu-bound operation. (sum)

For clarity, this is roughly implemented as:

@dask.delayed
def initial(input):
    ....

def subtask(sub_input):
    ....

def aggregate(sub_results):
    ....

@dask.delayed
def subprocess(initial_results):
    with worker_client() as c:
        distributed_results = c.scatter(initial_results)
        sub_tasks = c.map(subtask, distributed_results)
        sub_results = c.gather(sub_tasks)

    return aggregate(sub_results)

def logical_task(in):
    initial_future = inital(in)
    result_future = subprocess(initial_future)
    return result_future

In the current dask implementatation, the task graph is composed entirely of futures and the subprocess task thread (or coroutine) serves as a long-lived representation of the "expanded" future in the task graph. The root subprocess task is marked as in-progress while a new computational graph of subtask is assembled, submitted to the scheduler, and then gathered from the distributed system.

I initialy implemented this via get_clientworker_client but found that this caused major scaling issues when processing large number of "logical tasks", as each logical task requires a long-lived client connection to the core scheduler for the duration of the get_clientworker_client context. This quickly overloads the scheduler when there are a large number of potentially-concurrent logical tasks to execute.

Introducing the concept of a promise in the dask graph could be used to more effectively describe this distributed computation. In this context, a promise would be a node within the task graph on which tasks may depend, but who's dependencies may vary. The execution of tasks would then involve temporary creation a worker client, dynamic initialization of the subgraph, and then an update of the promise's dependencies to point at the new subgraph. The promise reference would then function similarly to a persistent client reference, triggering execution of the subgraph and fulfillment of the promise when it's current dependencies finish.

The API might look like:


@dask.delayed
def initial(input):
    ....

@dask.delayed
def subtask(sub_input):
    ....

@dask.delayed
def aggregate(sub_results):
    ....

@dask.delayed
def subprocess(initial_results, result_promise):
    with get_client() as c:
        # Create dynamic subgraph of subtasks and aggregation
        distributed_results = c.scatter(initial_results)
        sub_task_delayed = map(subtask, distributed_results)
        result_delayed = aggregate(sub_task_delayed)

        # Rewrites promise to depend on "result_delayed" task, instead of
        # subprocess, and triggers subgraph execution ala "persist". Execution
        # of "aggregate" task then 
        result_promise.resolve_with(result_delayed)

        # Release client
        return

def logical_task(in):

    # Create standard task for initial work
    initial_future = initial(in)

    # Create promise node within  graph
    result_promise = Promise()

    # Mark promise as "dependent" on subprocess, passing promise handle into
    # subprocess to allow fulfillment.
    result_promise.resolve_with(subprocess(initial_results, result_promise))

    return result_promise

Execution of "logical_task" would result in several graph states:

(A) Starting graph:

initial--> subprocess--> result_promise --> ...

(B) Initial is finished (f), subprocess executing (e):

initial(f)--> subprocess (e)--> result_promise--> ...

(C) Graph rewrite on "resolve_with" in subprocess

initial(f)--> subprocess (e)
          --> [subtask]--> aggregate--> result_promise--> ...

(C) Subtask/aggregate executes and subprocess finishes

initial(f)--> subprocess (f)
          --> [subtask(e)]--> aggregate--> result_promise--> ...

(C) Subtask/aggregate finishes and result_promise is fulfilled

initial(f)--> subprocess (f)
          --> [subtask(f)]--> aggregate(f)--> result_promise(f)--> ...
mrocklin commented 6 years ago

This quickly overloads the scheduler when there are a large number of potentially-concurrent logical tasks to execute

Can you expand on how the schedule became overwhelmed? Are you familiar with secede/rejoin? http://dask.pydata.org/en/latest/futures.html#submit-tasks-from-tasks

asford commented 6 years ago

Yes. Sorry to muddle the example but I used the worker_client context manager to handle the long-running tasks. I've updated my previous comment to reflect this.

I'll be first to admit it was a terrible naive setup, but I developed this issue while executing a group of 1-2 thousand of these "logical tasks" on a cluster of ~256 single-process workers. When running via worker_client the cluster ground to a halt as long-running tasks accumulated, and I was eventually unable to establish connections to the scheduler. My assumption, from looking through the logs, was that the scheduler node was being overwhelming by the number of client connections, but I didn't directly verify that fact and may be incorrect.

I've resolved this issue in the short term by:

  1. Moving to a fixed number work partitions per task, so that the task graph can be fully defined during setup. However, this is non-ideal in long-tail cases where the initial search results in a very large number of subtasks.
  2. Updating the underlying modeling software to accomodate multi-threaded execution, which dramatically reduces the number worker connections.

Unfortunately solution (1) isn't particularly workable in the long term, as the end implementation for this task will likely require a branched search strategy in which sub tasks may require recursive expansion to an arbitrary depth.

mrocklin commented 6 years ago

In theory the number of long-running connections shouldn't have any impact on the number of actual connections to the scheduler. They're all sharing the same single connection. There is a limit on the worker side, each long-running task occupies a thread, but you should be ok with up to thousands of these per worker.

asford commented 6 years ago

That makes sense. I suspect I may have then been seeing a combination of effects due to the pathological use of hundreds of workers along with worker clients. Would opening the first client on the worker result in establishing an additional connection to the scheduler, in comparison to a worker with no clients?

mrocklin commented 6 years ago

Yes, each additional client would open up a couple of new connections. This might push you over the 1024 open file limit that is default on most linux distros. You might want to watch the system/ page in the dashboard to watch the number of open file descriptors to verify. You could also up your open file limit (there are good instructions online for this)

asford commented 6 years ago

Yes, as I vaguely alluded to above, the primary solution to this problem was reworking how our compiled extension modules handled locking to enable multithreaded workers. This, along with the ulimit modifications you recommend, resolved most of these issues.

On the original topic, coroutines, I'd be very much +1. This would be an elegant way to reduce the effective complexity of the current succession-based long-running task system, and it closely matches existing python async patterns. I would imagine a system in which dask coroutines are executed on the worker thread pool, but can yield compute-able objects to an existing async Client.

If I could, slighly, update your example to illustrate this point:

@dask.delayed
def generate_data():
    return list(range(random.randint(0, 10e3)))  # a list of data of unknown length

@dask.coroutine
def process_all():
    num_per_partition = 32
    work_bag = dask.bag.from_delayed(generate_data)
    count = yield L.count()
    total = yield (
        work_bag
        .repartition(count / num_per_partition)
        .map(inc)
        .sum()
    )
    return total

One concern with worker-based coroutines and long-running tasks is that they introduce implicit dependencies between tasks that are only captured in the state of the coroutine. For example, the process_all task result depends on on the generate_data->count->inc->sum task graph, but the scheduler is unaware of this dependency, making the worker a single point of failure for this task execution. If the worker executing a given coroutine is lost then the subgraph is invalidated and must be recomputed.

adamklein commented 6 years ago

@mrocklin Thanks for posting your thoughts on the coroutine-style approach. It definitely makes sense from an ease-of-use perspective.

Regarding implementation possibilities ... would the @dask.coroutine annotation automatically secede the worker thread, as in using a worker_client context manager? Or, would it perhaps be able to follow a true coroutine approach without additional threads? E.g., one problem I've run into is needing to be careful with tasks that are not thread-safe: I wouldn't want a single-threaded worker pool to execute anything else while the @dask.coroutine is executing until actually hitting the yield statement.

Too bad a partially-evaluated coroutine is impossible to serialize and resume elsewhere. If we were using, e.g., stackless python, then you could even do the following: given a task A that has launched tasks B and C, you could capture the dependency of the continuation of A on B & C and reschedule it so that it respects data locality. (Too bad we live in the real world ...) Failing that, it would seem quite useful to follow a local-first scheduling policy whereby B&C are scheduled on the same worker as A because we know A will consume their results. Then of course the scheduler might relocate B&C. However, perhaps then there may even be a way to inform the scheduler not to move a task (if, e.g., you know it's going to generate large results). This is motivated by some of the same workflows that Ray seems geared toward (https://github.com/ray-project/ray). Not having played with it yet, it's not clear to me how they achieve their yield/resume semantics with tasks generating and awaiting other tasks.

mrocklin commented 6 years ago

Regarding implementation possibilities ... would the @dask.coroutine annotation automatically secede the worker thread, as in using a worker_client context manager?

I think that that's the kind of question that we would hammer out here. Thanks for bringing it up.

There seem to be a few options here (some of which you mention)

  1. The Dask coroutine runs in a separate thread and so can safely do computation in the coroutine itself. We secede when we yield and rejoin when we come back, leaving an active thread waiting out of the main thread pool.
  2. The Dask coroutine is a normal Tornado coroutine. We can't safely do computation here but we don't need to allocate extra threads
  3. A combination of the two, where we run Dask coroutines in separate threads, but shelve the coroutine when yielding, allowing the thread to be used elsewhere. This would require us to write a bit of or own event loop logic (which is fine) but would get the best of both worlds

My guess is that 1 or 3 is more likely, with the decision being made by how much people care about having extra threads hanging around.

given a task A that has launched tasks B and C, you could capture the dependency of the continuation of A on B & C and reschedule it so that it respects data locality. (Too bad we live in the real world ...)

Yeah, my guess is that this would be tricky to do in general. We could ask some of the cloudpickle maintainers. They might have thoughts on how to serialize ongoing generators. cc @llllllllll

Failing that, it would seem quite useful to follow a local-first scheduling policy whereby B&C are scheduled on the same worker as A because we know A will consume their results. Then of course the scheduler might relocate B&C.

This would be a bigger thing to accomplish, but is generally in scope. There is nothing stopping us from making workers more clever and doing their own local task scheduling work, kicking tasks up to the global scheduler in times of need. This would require time and active feedback to get right, but certainly seems possible.

llllllllll commented 6 years ago

They might have thoughts on how to serialize ongoing generators.

I have an idea for CPython, but I don't know if you are going to like it. I can push a prototype later tonight.

llllllllll commented 6 years ago

Here is my "prototype": https://github.com/llllllllll/cloudpickle-generators