dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 719 forks source link

Forget provenance #809

Open mrocklin opened 7 years ago

mrocklin commented 7 years ago

Iterative algorithms can have very long histories. Maintaining how we obtained final results in the scheduler can become very expensive.

x = 1
for i in range(100000):
    x = c.submit(inc, x)

wait(x)

In this case x only takes a few bytes in memory, but it's history in the scheduler takes several megabytes.

In cases like these it might be worthwhile to explicitly replace how we got x with a simple value, similar to what happens when we scatter data. This destroys options for resilience (if the worker goes down then we are unable to reproduce the value) but may still be useful.

x = c.forget_provenance(x)  # TODO: need better name

This would be helpful with iterative algorithms such as occur in dask-glm (cc @moody-marlin)

mrocklin commented 7 years ago

This might also be a keyword in persist

LunarLanding commented 4 years ago

Hi, this is of interest to my use case. How would I go about implementing? Any pointers?

PS: started hacking a solution that works for now. Might need more checks, I will soon find out. Based on code in distributed.scheduler, specifically _propagate_forgotten.

import dask
def task_forget_dependencies(ts,recommendations):
    key = ts.key
    for dts in ts.dependencies:
        dts.dependents.remove(ts)
        s = dts.waiters
        assert(ts not in s)
        #s.discard(ts)
        if not dts.dependents and not dts.who_wants:
            # Task not needed anymore
            assert dts is not ts
            recommendations[dts.key] = "forgotten"
    ts.dependencies.clear()

def scheduler_forget_dependencies(dask_scheduler=None,*,key):
    ts = dask_scheduler.tasks.get(key,None)
    if ts is None:
        return 'not found'
    assert(ts.state in ('memory','released'))
    recommendations = {}
    task_forget_dependencies(ts,recommendations)
    dask_scheduler.transitions(recommendations)
    return recommendations

def dask_get_key(obj):
    dep = obj.dask.dependencies
    assert(len(dep)==1)
    key_like = list(dep.keys())[0]
    return f"('{key_like}',)"

def get_current_client():
    return dask.distributed.Client.current(allow_global=True)
def future_forget_dependencies(fut):
    get_current_client().run_on_scheduler(scheduler_forget_dependencies,key=fut.key)

def dask_schedule_forget_dependencies(obj):
    key = dask_get_key(obj)
    fut = dask.distributed.Future(key=key)
    fut.add_done_callback(future_forget_dependencies)

Usage

dask_obj = ...
dask_obj.persist()
dask_schedule_forget_dependencies(dask_obj)
mrocklin commented 4 years ago

@LunarLanding in principle it looks like you're off to a good start. Your approach of manipulating scheduler state directly looks good to me. I'm sure that there will be some corner cases to think about :)

As a next step, I encourage you to turn these functions into methods on the Scheduler class itself and then make a test for it, maybe like the following:

@gen_cluster(client=True)
def test_forget_history(c, s, a, b):
    x = 1
    for _ in range(10):
        x = c.submit(inc, x)
    assert len(s.tasks) == 10

    s.forget_history(keys=[x.key])
    assert len(s.tasks) == 10

    result = await x  # same result as before
    assert result == 10

At this point, I would make a pull request so that it's easier for others to play with the implementation.

Then, after that I would encourage you to make a client method that called this method on the scheduler. You might look at Scheduler.handlers for examples. You would then change your test so that the client called the method remotely.

@gen_cluster(client=True)
def test_forget_history(c, s, a, b):
    x = 1
    for _ in range(10):
        x = c.submit(inc, x)
    assert len(s.tasks) == 10

    await c.forget_history(keys=[x.key])
    assert len(s.tasks) == 10

    result = await x  # same result as before
    assert result == 10

After that we would probably try to stress the implementation to make sure that it worked well under a variety of cases.

You're off to a great start!