dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Different behaviour in distributed vs single-machine scheduler: missing task #4750

Open maurosilber opened 3 years ago

maurosilber commented 3 years ago

I'm adding extra tasks on the optimizing step to save some task results to disk. The extra tasks are run on the dask single-machine scheduler, while they are not run on the distributed scheduler. It might not be a bug, but the intended behavior. In that case, how should have I done it?

Thanks!

Minimal Complete Verifiable Example:

Example 1:

import dask

def optimize(dsk, keys):
    print("Optimizing graph")
    dsk = dask.utils.ensure_dict(dsk)
    # Adding an extra task to the graph
    dsk["extra_task"] = (print, "Running extra task")
    return dsk

dask.config.set(delayed_optimize=optimize)

dask.delayed(print)("Running task").compute()

Running example 1 prints:

Optimizing graph
Running extra task
Running task

Example 2:

Start dask-scheduler and a dask-worker:

dask-scheduler & dask-worker localhost:8786 &

Add to example 1 the following:

import dask
import distributed

client = distributed.Client("localhost:8786")

def optimize(dsk, keys):
...

Example 2 prints "Optimizing graph" in the python process running the file, and "Running task" in the dask-worker process, but it doesn't print "Running extra task".

Environment:

New conda environment from environment-3.8.yaml. Cloned and "pip -e" installed dask and distributed, tags 2021.04.0.

fjetter commented 3 years ago

Looks like the optimizer function is not properly selected. I would try to very if the config was properly updated. You can also try calling dask.optimize yourself to troubleshoot the issue.

Note that the graph manipulation you are adding is actually not processing the output of any previous task but is rather an unconnected edge in your graph. Some of the default optimizers would cut this simply out (that's called culling)

However, from your description it sounds like the easiest solution would be to simply attach another task to the graph to persist the result.

@dask.delayed
def persist_result(result):
    do_smth_with_result(result)
    return result

@dask.delayed
def do_computation(...):
    return something

task = do_computation()
graph = persist_result(task)

graph.compute()

Any reason why you choose to go for the very low level optimizer?

maurosilber commented 3 years ago

Thanks for the fast reply!

Looks like the optimizer function is not properly selected. I would try to very if the config was properly updated. You can also try calling dask.optimize yourself to troubleshoot the issue.

There is an optimizing step running locally, as the print("Optimizing graph") runs in both cases. But, are there additional optimizing steps, such as culling, after sending the graph to a scheduler?

I did forget to build a minimum example that reflects my use case. What I'm trying to do is more or less the following.

With this starting graph, computed as delayed(func_c)(...):

starting_graph = {
  "a": (func_a, 1),
  "b": (func_b, "a"),
  "c": (func_c, "b"),
}

If the "b" result is already saved on disk, replace with a load on the optimizing step:

optimized_graph = {
  "b": (load_b,),
  "c": (func_c, "b"),
}

which includes a culling step to discard "a".

If it's not saved on disk, add a saving step:

starting_graph = {
  "a": (func_a, 1),
  "b": (func_b, "a"),
  "b-save": (save, "b"),
  "c": (func_c, "b"),
}

But, as the starting graph is generated by computing a delayed func_c, the only key requested is "c". "b-save" is not entirely disconnected, but ends up as a leaf node which is not computed on the distributed scheduler.

As a workaround, I think I could change the save function to return the input, and fuse "b" and "b-save".

Any reason why you choose to go for the very low level optimizer?

I went for the low-level optimizer as recomputing the same delayed(func_c) should return a different graph that loads the results.

fjetter commented 3 years ago

Is it possible for you to construct a minimal example? Input graph + optimizer function for your use case? I personally would probably still try to solve this problem without resorting to the optimizer but the optimization itself sounds still simple enough that it should work. Definitively, it shouldn't change depending on the execution backend.

But, are there additional optimizing steps, such as culling, after sending the graph to a scheduler?

I think not but I'm not 100% sure for high level graphs. @madsbk you have been involved with this, I believe. Can you answer this question or point to somebody who can?

maurosilber commented 3 years ago

Sure! It's a minimal example using a dict as storage.

import dask
import dask.utils

storage = {}

def save_and_return_value(k, v):
    storage[k] = v
    return v

def optimize(dsk, keys):
    dsk = dask.utils.ensure_dict(dsk)
    for k, v in dsk.items():
        save_key = f"save-{k}"

        if save_key in storage:
            # Load
            dsk[k] = (storage.get, save_key)
        else:
            # Save
            dsk[k] = (save_and_return_value, save_key, v)

    return dsk

dask.config.set(delayed_optimize=optimize)

@dask.delayed(pure=True)
def func(x):
    print("Running func")
    return 2 * x

x = func(42)

x.compute()  # Runs and saves to dict
x.compute()  # Loads from dict

It prints "Running func" only once, and the second time its replaced by the stored result.

I'm developing a package which uses this idea to store (some) results to disk: https://github.com/maurosilber/pipeline

madsbk commented 3 years ago

But, are there additional optimizing steps, such as culling, after sending the graph to a scheduler?

Yes if possible, optimizations such as culling and blockwise fusion will be done by the scheduler. Currently, this might not happen if low-level fusion is enabled but we are working on moving most, if not all, optimizations to the scheduler.

I don't think plugging into the optimizers are an optimal choice here. Maybe introduce a function that parse x.__dask_graph__() and inject load/save instructions instead of relying on the optimize infrastructure?

maurosilber commented 3 years ago

Do you mean something like:

def load_or_save(x):
    """Inject load/save instructions."""
   return ...

x = func(42)

load_or_save(x).compute()  # First time, runs and saves
load_or_save(x).compute()  # Second time, loads

I wanted it to be hidden from the user, as it could be misused as follows:

x = func(42)
x = load_or_save(x)

x,compute()  # Runs and saves
x.compute()  # Runs and saves again