dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Active Memory Management Control System #4982

Open mrocklin opened 3 years ago

mrocklin commented 3 years ago

System to Control Active Memory Management

There has been a lot of side conversations around active memory management, but I don't think we've ever written this up properly (aside from maybe https://github.com/dask/distributed/issues/1002 , which I'm now closing)

Dask currently replicates data around the cluster whenever a dependency on one worker needs to be moved for a dependent on another worker. This passive approach is good because highly needed data tends to spread around, but bad because excessive copies may start to take up distributed memory.

Motivating use cases

There are many situations where we want to more actively control the management of data on the cluster. I'll list a few below:

  1. Data that we know will be in high demand should be replicated proactively.

    Demand might be either because of many not-yet-run dependencies, or as insurance for workers that might go down

  2. Data that has been replicated may no longer be necessary and it may be a good idea to clear it out

  3. When we shut down a worker we want to offload its data before retiring it

  4. If we're able to reliably shut down workers then we would like to cycle their processes to avoid issues like memory leaks

  5. When a worker is running low on space we may want to offload its data rather than sending it to disk

  6. When new workers arrive they should probably proactively pull data away from some of their more saturated peers

  7. Advanced users will want to define their own special policies

  8. The user may specify replication requests to manually motivate one of the above reasons

  9. ...

Challenges

However, making a general purpose system that can solve all of these problems (and more) is hard for a few reasons. I'll list a few below:

  1. Efficiency: We need to efficiently and quickly identify what data should move where, even in cases where we have millions of tasks and thousands of workers
  2. Consistency: Moving this data can be error prone during cluster activity. This requires a subtle dance between scheduler, possibly multiple workers, and maybe other active policies
  3. Customizability: We are not smart enough to predict the right approach here and implement a fully general solution on the first try. Instead we probably need to make a system that allows for a variety of overlapping user-defined policies.

Existing work on rebalance logic

There is currently some work from @crusaderky on a special but important case of this problem, making rebalance faster and more robust. The idea here being that this is a good and general first step towards a larger system.

This is good work because it solves a tricky problem of how to quickly identify data that should be moved, and good targets to accept this data. It also starts to think about robust movement of data.

This is one example of logic that folks might want to implement

Proposed Control System

At the same time, I'd like for us to start thinking about a control system for these policies. I'll outline something below to serve as a first pass. This is the simplest system I could think of that can maybe solve the problems listed above.

We have a variety of memory management policies which are invoked periodically, they also optionally have a finished criterion, which will clear them from the system.

Example, retire workers method

So retire workers might look like the following:

async def retire_workers(self, workers):
    # register several new policies with the memory manager
    policies = [RetireWorkerPolicy(self, worker) for worker in workers]

    # wait until they are finished
    await asyncio.gather(*policies)

    # close workers now that data has been moved
    await asyncio.gather(*[self.close_worker(worker) for worker in workers])

These RetireWorkerPolicy objects would live in some plugin for a while until they finshed up. There might be a variety of other policies in there, some long-lived, some short-lived

>>> scheduler.memory_manager.policies
[<RetireWorkers(tcp://..., status: waiting, period: 100ms)>,
 <RetireWorkers(tcp://..., status: waiting, period: 100ms)>,
 <RetireWorkers(tcp://..., status: waiting, period: 100ms)>,
 <RetireWorkers(tcp://..., status: waiting, period: 100ms)>,
 <WatchForExcessMemory(period: 1s)>,
 ...
 <Replicate(task: sklearn-model-123, replicas: 3, period: 500ms)>
 ...
 ]

Actions

Each policy, when invoked, emits a set of proposed actions.

Actions can be one of two forms:

Note that we don't merge these two, as we did in replicate/rebalance. This is error prone and requires careful attention. I think that by separating these two we can achieve what we want with more stability.

Acquire data

Workers already have a solid way to acquire a piece of data in a noisy cluster. They use this today to gather dependencies. Let's reuse that logic.

However to avoid consistency issues I suspect that we're going to need to keep track of what data each workers are currently trying to obtain. Otherwise, for data that takes a long time to acquire we might mistakenly ask many workers to get the same piece of data.

To address this I think that we could start tracking not only what data each worker has, but what data it is trying to get. We would also have to track the same information on the task. This would be a dynamic version of TaskState.who_has and WorkerState.has_what. Something like TaskState.who_is_acquiring and WorkerState.collecting but with better names.

Remove data

Similarly with adding data ...

  1. We already have a system for the scheduler to encourage workers to remove data
  2. Things may change in the meantime, and workers may disagree (maybe they were just recently assigned a task that requires the piece of data as a dependency.

Unlike with adding data, I suspect that we can do this without adding extra state to the workers or tasks. I suspect that we can ask the worker to remove a piece of data, and then immediately mark that data as removed so that future scheduling decisions won't depend on it. If for some reason the worker disagrees then it can send back its usual "I now have this data" message and things may be ok.

Example, Replicate policy

Then, something like replicate might look like the following:

class Replicate(MemoryManagementPolicy):
    period = "1s"

    def __init__(self, task: TaskState, replicas: int):
        self.task = task
        self.replicas = replicas

    async def update(self):
        if self.task.name not in self.scheduler.tasks:
            self.finished.set()
            return []

        count = len(self.task.who_has) + len(self.task.who_is_acquiring)
        if count < self.replicas:
            ws = self.scheduler.get_worker_with_lots_of_memory()
            return [("get", ws, ts)]  # some other return convention is probably better
        elif count > self.replicas
            ws = # find most saturated worker
            return [("remove", ws, ts)]  # some other return convention is probably better

Some things to note here:

  1. We're not doing the work to move the data around. We're going to trust the system to do this. I hope that the system can be relatively simple, given the simple operations that we have.
  2. This policy closes itself off once the task has left the scheduler.
  3. We don't try to set every possible worker change in one go. We could if wanted to by returning multiple actions, but we don't have to. We're making the world a bit better. This system will be called again shortly, so we don't need to be super-aggressive here. We just need to improve the world a bit.

Control loop

I am hopeful (probably naively so) that the control loop can be fairly simple. I hope that the following code gets my point across

class ActiveMemoryManager(SchedulerExtension):
    def __init__(self, scheduler):
        self.scheduler = scheduler
        self.periodic_callbacks = {}

    def add_policy(self, policy: MemoryManagementPolicy):
        policy.scheduler = self.scheduler

        pc = PeriodicCallback(functools.partial(self.run, policy=policy.update), interval=parse_timedelta(policy.period))

        self.periodic_callbacks[policy] = pc

        asyncio.ensure_future(self.cleanup(policy))  # TODO: avoid dangling futures

    def run(self, policy):
        recommendations = policy.update()
        for action, ws, ts in recommendations:
            if action == "get":
                self.send_to_worker(ws.address, {"op": "gather_data", "keys": [ts.key]})
            elif action == "remove":
                self.send_to_worker(ws.address, {"op": "remove_data", "keys": [ts.key]})

    async def cleanup(self, policy)
        await policy.finished
        pc = self.periodic_callbacks.pop(policy)
        pc.stop()
crusaderky commented 3 years ago

Actions can be one of two forms: Ask a worker to acquire a piece of data Ask a worker to delete a piece of data, if it agrees Note that we don't merge these two, as we did in replicate/rebalance.

This has the benefit of simplicity however it means that all rebalanced keys will be temporarily duplicated for several seconds, until the next rebalance pass. There should be a "move" action that immediately deletes a key as soon as a copy is confirmed to be successful.

fjetter commented 3 years ago

How do we manage competing interests between policies?

Simple example using some of the policies you mentioned. We have a Replicate running trying to duplicate data X on the cluster and a WatchForExcessMemory which I assume would observe the cluster and would try to eliminate excessive replicas. If X is very large it would decide to reduce the number of replicas. Who wins?

I'm reasonably confident that we can construct more elaborate scenarios with the potential to cause instabilities.

Additional action: Pause task execution / block / throttle?

To manage the peak number of replicas we might want to explicitly block execution of a tasks dependent to avoid unnecessary memory pressure and control overall replicas. This would probably require an additional scheduler state but would otherwise also work with the proposed basic actions

mrocklin commented 3 years ago

This has the benefit of simplicity however it means that all rebalanced keys will be temporarily duplicated for several seconds, until the next rebalance pass. There should be a "move" action that immediately deletes a key as soon as a copy is confirmed to be successful.

We're going to have a copy around for a bit anyway. If it's simpler / more robust to leave things around for slightly longer (presumably the time of another cycle of this callback + coordination time) then that sounds like a good tradeoff to me. If this thing is running in the background all the time then waiting a few seconds sounds cheap to me.

mrocklin commented 3 years ago

How do we manage competing interests between policies?

I think that short-term we probably don't worry about it / we think about it outside of the codebase.

Longer term maybe the control system watches for signs of churn and raises warnings if so?

mrocklin commented 3 years ago

Additional action: Pause task execution / block / throttle?

I'm inclined to keep this out of scope for now. I think that this pattern of having various independent systems run on the scheduler, inspecting state, and making changes, is a good one though. I could imagine another that pauses workers?

mrocklin commented 3 years ago

I'm going to try to lay out a set of steps that might enable an experiment here, with the goal of removing replicated data from the cluster after it is no longer necessary. Some steps:

Add and remove data

First we probably want to be able to ask workers to add and remove data. We already have this logic built out, but it isn't immediately accessible to the scheduler. We'll need to fix that by providing stream handlers.

Some questions:

These two steps are probably testable in isolation

@gen_cluster(client=True)
def test_gather_dep(c, s, a, b):
    x = await c.scatter(123, address=a.address)

    b.collect_key(x.key)
    while x.key not in b.data:
        await asyncio.sleep(0.01)

    a.free_keys(keys=[x.key])
    while x.key in a.data:
        await asyncio.sleep(0.01)

(my apologies for the white-box test, this is mostly for the developer who writes this up, rather than future testing.)

This is likely to require/build understanding of both how the workers/scheduler use BatchedSends to communicate with small administrative messages, and also knowledge of the Worker's state machine / TaskState handling.

Control system

Then, we can use these to a SchedulerExtension that holds policies and enacts transfers. This was mostly implemented above but I'll include a copy here for convenience.

class ActiveMemoryManager(SchedulerExtension):
    def __init__(self, scheduler):
        self.scheduler = scheduler
        self.periodic_callbacks = {}

    def add_policy(self, policy: MemoryManagementPolicy):
        policy.scheduler = self.scheduler

        pc = PeriodicCallback(functools.partial(self.run, policy=policy.update), interval=parse_timedelta(policy.period))

        self.periodic_callbacks[policy] = pc

        asyncio.ensure_future(self.cleanup(policy))  # TODO: avoid dangling futures

    def run(self, policy):
        recommendations = policy.update()
        for action, ws, ts in recommendations:
            # These are the operations that we built in the last section
            if action == "get":
                self.send_to_worker(ws.address, {"op": "gather_data", "keys": [ts.key]}) 
            elif action == "remove":
                self.send_to_worker(ws.address, {"op": "remove_data", "keys": [ts.key]})

    async def cleanup(self, policy)
        await policy.finished
        pc = self.periodic_callbacks.pop(policy)
        pc.stop()

On its own this is hard to verify that it works or is helpful, so we need a policy, I'm going to suggest the following policy that removes excessive copies. In particular my recommendation is that we find tasks that are on more workers than they have pending dependents. So if a task still needs to be run by 100 tasks then we're probably not going to remove anything. If it is no longer needed for any dependent then we should start removing some replicas.

In order to be more efficient when going through tasks we might instead go through workers and then go through the tasks that are currently in memory

class ReduceReplicas(MemoryManagementPolicy):
    period = "1s"

    def __init__(self):
        pass

    def update(self):
        recommendations = []
        for ws in self.scheduler.workers.values():
            for ts in ws.has_what.values():
                if len(ts.who_has) > len(ts.waiters):  
                    donor = decide_good_worker_to_remove_task(...)
                    recommendations.append(("remove", donor, ts))
        return recommendations

This seems like a policy that should be easy to implement and actually useful. We could get more clever by doing things like only running this on workers that are somewhat overburdened with memory, doing it only a few workers at a time in order to keep things lively, etc.. Those are all probably future work though.

Summary

I think that the only tricky thing here is taking existing communication system that the workers use to gather dependencies and release data, and exposing it as an operation that the scheduler can use.

Then I think that it's mostly code plumbing to make a scheduler extension, make some policy class, set up periodic callbacks, etc.. This stuff should be somewhat straightforward. I don't think that any creativity is necessary here.

The reduce replicas policy that I present above can probably be improved in several ways, and that's another place where creativity might be warranted, but I think that a brute force approach might do ok for a while.

mrocklin commented 3 years ago

Also, for a general purpose test, I think that something like the following could work:

@gen_cluster(client=True)
def test_clear_copies(c, s, a, b):
    x = da.random.random((1000, 1000), chunks=(100, 100)).persist()
    y = await x.dot(x.T).sum().persist()

    # check that copies are eventually cleared out
    while any(len(ts.who_has) > 1 for ts in s.tasks.values()):
        await asyncio.sleep(0.1)

I think that eventually this test may fail, as we decide to accept some copies, especially when we have a lot of excess memory, but it should serve during a proof of concept phase.

crusaderky commented 3 years ago
    def update(self):
        recommendations = []
        for ws in self.scheduler.workers.values():
            for ts in ws.has_what.values():
                if len(ts.who_has) > len(ts.waiters):  
                    donor = decide_good_worker_to_remove_task(...)
                    recommendations.append(("remove", donor, ts))
        return recommendations

This seems like a policy that should be easy to implement and actually useful. We could get more clever by doing things like only running this on workers that are somewhat overburdened with memory, doing it only a few workers at a time in order to keep things lively, etc.. Those are all probably future work though.

It's going to be very expensive to do this full scan of all the in-memory keys in the cluster once you start having 100k+ such keys. The recent O(1) rebalance() PR gives you a good measure of how bad it would get. I doubt that optimization can be left to future work. It would be a lot more efficient if changes to ts.who_has, ws.has_what, and ts.waiters were dealt by some function that internally flags the task for replica reduction.

mrocklin commented 3 years ago

I agree that data structures that indexed tasks by replication would make this faster.

I doubt that optimization can be left to future work

I agree that we shouldn't make this on-by-default until we resolve this. I do think that we can get a proof of concept merged in with opt-in behavior before then though.

Some other options for remediation that came to mind:

  1. Iterate through workers in order of memory pressure, stop once workers are not under pressure
  2. Increase the period to some longer time cycle, in the seconds range
  3. The control system might, in the future, track the time of all update methods, and increase that range based on the time taken. It might also choose to skip certain policies if it is under CPU pressure (we do this currently for reevaluate_occupancy)

So in general I agree that this is an issue, but it's an issue with enough solutions that I'm ok ignoring the issue for now, knowing that we have a path (or several paths) to resolve it in the future. I think that we should build a simple system first before investing the time for the full solution (which we might find was wasted)

mrocklin commented 3 years ago

Looking a bit more closely at the Worker, I realize that we now have a new state machine that wasn't there the last time I looked. My hope is that the signal from the scheduler to the worker can just say "make a new dependent with this key and set it on a path to be collected" and then have all the rest of the machinery work. It probably makes sense to wait for @fjetter to get back and comment on the right way to ask a worker to collect data before going forwards.

fjetter commented 3 years ago

Indeed the worker state machine is a bit weird at the moment. I looked over the gather_dep functionality once more with this use case in mind and the worker is not really built for this usecase.

Registering a task

The only way to properly register a task at the moment is via the Worker.add_task method. This is a rather long and complex method since it tries to account for all possibly prior existing states to perform a state transition. More aptly this method should be called register_execute_task since it is used to let the worker know about a task it is supposed to compute. it adds enough metadata for the worker to continue without the schedulers intervention. It receives a key and constructs the TaskState in such a way that it will be executable. The tasks dependencies are submitted via the who_has. There is currently no way to make the worker aware of a "disconnected" dependency (disconnected in a sense that the worker never knows about the full graph)

We could easily provide another handler to register a dependency and construct the respective task state. That's what happens in add_task anyhow and we could reuse that logic probably, see https://github.com/dask/distributed/blob/cbcec9cdc020a36c024d55c216d5391c7b09ccae/distributed/worker.py#L1619-L1659

Actual collection

This is where it gets tricky since the "path to be collected" is usually done via the ensure_communicate method which is run on a few selected points in the code base (for instance it runs on every handle_comm an after a few transitions; could instead be a PC) It isn't built for this use case. It always assumes there to be an executable dependency for every key we're supposed to fetch.

Pseudocode this looks like the following

while self.data_needed:
    key = seld.data_needed.pop()  # This is a key to-be-executed, i.e. a runnable task
    deps_to_fetch = get_dependencies_not_in_memory(key)

    while deps_to_fetch:
        dep = deps_to_fetch.pop()
        worker = select_good_worker(dep)
        # now that we picked a worker, we'll see if on that worker are any more dependencies for any 
        # task in data_needed to reduce comm overhead
        more_dependencies = self.select_keys_for_gather(worker, dep)
        trantition_all_to_fetch(more_dependencies)

Actually fetching the data

The actual fetching of the data is done in Worker.gather_dep (scheduled as a asyncio Task by ensure_communicating). Assuming the task was properly registered with the scheduler (exists in Worker.tasks, has who_has attribute, etc.) this should work on the happy path for keys without dependents. I am aware of a few error handling situations which are sensitive to whether or not there are dependents but that should not be a showstopper. In particular the error handling cases are still a bit unstable and we'll need to touch that area anyhow soon (probably now, likely connected to another reported deadlock)

I think we should try reusing gather_dep for this use case since I could easily see race conditions where a key is fetched for the sake of replication but during this fetch it is assigned the keys dependent for compute. Worker.gather_dep is the only place where we'd currently handle the follow up state transitions properly. these kind of race conditions where a big part of the deadlocks I debugged recently.

Regarding deletion

what operation do we use to ask a worker to free a piece of data? Is it free_keys or superfluous_data perhaps?

free_keys is currently a handler designed for the "delete without hesitation" option and this is in fact not what we want to do. The handle_superfluous_data is more in line with what we're trying to do since its current use case is to delete an unexpected replica from the cluster. It's only invocation is in the case where a worker registers an in-memory key on the scheduler even though the key is not supposed to be in memory. This is not 100% what we want to do here but close.

The safety guard in superfluous_data is that it is forbidden to delete the key if the scheduler instructed it to be computed (there is the attribute scheduler_holds_ref). this protected us from rescheduling in worker failure scenarios to forget and delete to-be-computed keys. I'm not 100% sure if this is still a problem since I wasn't able to produce a "black box test" for this scenario.

On top of this, I believe workers should be allowed to reject a delete request if it requires the key itself, i.e. if a dependent is to be executed. This is a very simple but probably stable option we could start with. It could later be refined to "the key will not be needed for the next X seconds, therefore release it and collect it later again" if necessary. From a stability point of view, I hope this is the only case where we actually need to reject a delete request.


Summing up, the biggest problem I see is the gather_dep / data_needed mechanism and I'm inclined to say this should be rewritten. I wouldn't feel comfortable with a parallel system due to the fragile state machine. If we were to go for a parallel fetch implementation I would prefer us picking up https://github.com/dask/distributed/pull/4772 again since the intention there is to "formalize" the state machine such that the bulk logic resides in the transition functions and not outside the state machine in various methods. I would need to look into the PR since after I abandoned it a lot changed (for the better) but the ideas should still work.

fjetter commented 3 years ago

Last note about the gathering: We do have a mechanism right now to gather keys on the worker side which circumvents the ordinary state machine and instead is an explicit RPC call, namely the gather: Worker.gather comm handler. This is much simpler and does not deal with business, retries, missing data, metrics collection, etc. For a simple mechanism to start with, this might be sufficient (That's what the current rebalance/replicate is built on)

mrocklin commented 3 years ago

To try to summarize, there are maybe three options

  1. Move all logic into the state machine, then adding this is easy
  2. Side step the state machine entirely, use the current gather option. Easy to do, might not be entirely stable.
  3. Use gather_dep and ensure_communicating, add some logic so we check both data_needed as well as some new collection. A middle ground and stable solution, but not entirely trivial and increases the inertia that we'll need to rewrite when we do a state machine rewrite.

Is that a correct way of breaking things down? If so, do you have a recommendation or preference?

On Tue, Jul 6, 2021, 5:17 AM Florian Jetter @.***> wrote:

Last note about the gathering: We do have a mechanism right now to gather keys on the worker side which circumvents the ordinary state machine and instead is an explicit RPC call, namely the gather: Worker.gather comm handler. This is much simpler and does not deal with business, retries, missing data, metrics collection, etc. For a simple mechanism to start with, this might be sufficient (That's what the current rebalance/replicate is built on)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4982#issuecomment-874709925, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTD3KHHZOTDSE5XEHALTWLX63ANCNFSM47MU3OHA .

fjetter commented 3 years ago

Is that a correct way of breaking things down

Yes that captures it nicely.

If so, do you have a recommendation or preference?

I somehow dislike option 3. since I consider the worker often too complicated already. A big problem I'm having with the code lately is that there are many handlers/methods which are doing five different things with subtle differences. Knowledge and understanding about these subtleties is lost after a while and engineering becomes forensics.

Now that we have some kind of (almost deadlock free) stability I would like to pick up my draft of a state refactoring to see where we are. I can tell more after I spent an afternoon with it and can estimate how much work it is.

In the meantime we can start with 2.) and even work towards 1. simultaneously. One problem we can solve from the very start is trying to figure out what kind of stimuli are required from scheduler to worker. The stimuli we currently have are

If we add the necessary stimuli on worker side right away, even if the implementation doesn't give us all the required guarantees, the refactoring and AMM control system can evolve simultaneously without colliding, theoretically. IIUC the stimuli we require for the first few basic AMM policies are something like


def acquire_replica(self, comm, key, who_has):
    """The worker is instructed to fetch a replica of Task/Data `key` from `who_has`"""
    # pseudo
    worker = pick_worker(who_has)
    busy, data = get_data_from_worker_if_not_busy(key, worker)
    if not busy:
        self.put_key_in_memory(data)

def remove_replica(self, comm, key):
    """Forget data for the given key if it is not actively needed"""
    ts = self.tasks[key]
    if not any((d.state in ["executing", "ready", "waiting"] for d in ts.dependents)):
        self.release_key(key, report=True)

I think remove_replica should be identical to superfluous_data but the state machine is currently very fragile which is why this might start off slightly differently but we could eventually merge them once the task state machine stabilizes. acquire_replica would be more or less what we currently have for the RPC get_data but as a stream handler

mrocklin commented 3 years ago

Ideally we would want the acquire_replica handler to not require a who_has dict, and be able to figure things out if things break down. One nice thing about the current ensure_communicating system that we have today and that we would like to capture is that we know it is robust to a noisy cluster.

If we add the necessary stimuli on worker side right away, even if the implementation doesn't give us all the required guarantees, the refactoring and AMM control system can evolve simultaneously without colliding

OK, fair enough.

In the meantime we can start with 2.) and even work towards 1. simultaneously

Yeah, I'm trying to find a path through here that let's @crusaderky get back to work on this topic after the CI work (thanks again for that @crusaderky ), but that also leads towards a better future.

I would like to pick up my draft of a state refactoring to see where we are. I can tell more after I spent an afternoon with it and can estimate how much work it is.

OK, that sounds like a plan

fjetter commented 3 years ago

Ideally we would want the acquire_replica handler to not require a who_has dict,

Well, if the worker hasn't heard of that task before, how is it supposed to know where to find the data without who_has? I'm thinking that a worker might be instructed to pick up replicas for tasks it has never heard of before.

This who_has could of course be replaced by the worker calling the scheduler for a who_has. We do this occasionally but I don't see any benefits by not submitting this information right away.

ensure_communicating system that we have today and that we would like to capture is that we know it is robust to a noisy cluster.

Of course. We wouldn't handle busy workers properly, nor would we be able to leverage synergies by batched get_data calls. Also the missing data handling would not be implemented. Since the replication is less critical and hopefully less frequent (at least tunable), I could live with these things short term.

mrocklin commented 3 years ago

Well, if the worker hasn't heard of that task before, how is it supposed to know where to find the data without who_has? I'm thinking that a worker might be instructed to pick up replicas for tasks it has never heard of before.

This who_has could of course be replaced by the worker calling the scheduler for a who_has. We do this occasionally but I don't see any benefits by not submitting this information right away.

I agree that we can submit something right away, but the worker should be able to reach back out on its own. The ensure_communicating system provides this. Workers will call scheduler.who_has to get up-to-date information if anything goes wrong.

Of course. We wouldn't handle busy workers properly, nor would we be able to leverage synergies by batched get_data calls. Also the missing data handling would not be implemented. Since the replication is less critical and hopefully less frequent (at least tunable), I could live with these things short term.

Yeah, fair enough.

What I'm hearing is that we have a path forward that is cheap and good-enough in the short term and that doesn't get in the way of more solid and robust work in the future.

mrocklin commented 3 years ago

Now that we have some kind of (almost deadlock free) stability I would like to pick up my draft of a state refactoring to see where we are. I can tell more after I spent an afternoon with it and can estimate how much work it is.

@fjetter checking in here. Do you have suggestions on the right way to proceed? I'm thinking about scheduling of @crusaderky's time. It seems like it is most time efficient to wait a bit to see if we can get the worker state machine rewrite in, and then maybe removing replicas becomes very easy. This is also a nice way to separate work between you and @crusaderky where you focus on enacting changes and @crusaderky focuses on specifying what changes should occur.

However, this also seems like the kind of project that could take a day, or could take several weeks. My guess is that we maybe have a couple more days while @crusaderky fixes up CI (again, thank you for your service Guido) but after that we should probably start on this work again.

So maybe my question is "how far can we get in the next couple of days?" and also "how long will fixing up CI take?"

crusaderky commented 3 years ago

CI is now done. I think it's sensible for me to start working with the current self.rpc system and only migrate to a state machine later on, in order to minimise the amount of collisions with @fjetter's work. I don't expect this will cause much overhead, and from a user's perspective you should be able to get the desired feature sooner (drop excessive replicas) at the cost of slightly more expensive comms. I don't expect to need to do twice the part where the scheduler asks to drop a key but the worker decides it's not a good idea, because it sounds nicely separated from how comms are performed.

crusaderky commented 3 years ago

@mrocklin

@fjetter and I had a chat. The agreement is that, in a first iteration, the system that deletes unwanted replicas will use Scheduler.rpc (what rebalance and replicate use now). This has been called "option 2" above in the discussion. This means that, as of the initial iteration, the deletion won't be safe to run in the middle of a computation. Also as of the initial iteration, the cost will be O(n) to the total number of tasks in the scheduler, as already discussed.

In the meantime, @fjetter will have the time to complete #5046. The key feature that's not in the PR yet is that, whenever the scheduler sends a fire-and-forget request for the deletion of a key and the worker decides NOT to delete it, the worker must report back its decision. On the scheduler side, TaskState will keep track of the pending deletion requests. This extra state is necessary to avoid either accidentally deleting all copies of a key or accidentally ending up with 2 or more replicas of the same key in perpetuity.

Once both of the above are done, it should be pretty much trivial to switch system (literally delete the 2 lines of self.rpc and replace them with the enqueueing into the bulk comms). This will be simple because already as of the initial design the whole thing won't need to wait for feedback from the workers. From a user's perspective, this will enable safely discarding unwanted key replicas in the middle of a computation.

As a separate discussion, we are both very concerned about how the policy plugin system is going to handle conflicting decisions between policies, and we would really like to see some real-life use cases of what a user-defined policy could want to achieve (at functional analysis level) before a design is finalised.

mrocklin commented 3 years ago

I think it's sensible for me to start working with the current self.rpc system and only migrate to a state machine later on, in order to minimise the amount of collisions with @fjetter's work

OK, sounds good to me.

The key feature that's not in the PR yet is that, whenever the scheduler sends a fire-and-forget request for the deletion of a key and the worker decides NOT to delete it, the worker must report back its decision. On the scheduler side, TaskState will keep track of the pending deletion requests. This extra state is necessary to avoid either accidentally deleting all copies of a key or accidentally ending up with 2 or more replicas of the same key in perpetuity.

I think that deletion is easier than replication. There are, I think, two approaches:

  1. When we send the delete signal, mark the replica as deleted from that worker immediately. Then if the worker disagrees have it send back the generic "I have this data now" signal. In the worst case this will cause the data to blink out and then back in, but that's annoying at worst, and shouldn't affect stability (I don't think so at least).
  2. When we send the delete signal, do not mark the replica as deleted, instead wait for the worker to send us a signal back. In this case then yes, I agree that we should keep track of pending deletions that would require more state (but not that bad I think)

I think that option 1 should be safe and easy to do.

As a separate discussion, we are both very concerned about how the policy plugin system is going to handle conflicting decisions between policies, and we would really like to see some real-life use cases of what a user-defined policy could want to achieve (at functional analysis level) before a design is finalised.

I agree that this could be fertile ground for concerns. Personally I don't think we're going to get a finalized design before trying a few things out. I think that we want to start with a simple system, see how it performs, and then adjust that system as we add in more complex policies.

crusaderky commented 3 years ago

When we send the delete signal, mark the replica as deleted from that worker immediately. Then if the worker disagrees have it send back the generic "I have this data now" signal.

Sounds good and robust to me.

crusaderky commented 3 years ago

Question for those knowledgeable about the various GPU libraries that are typically used on dask (@mrocklin please ping whoever appropriate).

If you have a hybrid cluster where some workers mount a GPU while others only have CPUs, is it ok for a rebalance operation to "park" a GPU object (e.g. a cupy array) on a CPU-only node? Or will it fail to unpickle? I think there will be different answers for different libraries and possibly different settings within the same library (e.g. with or without unified memory).

mrocklin commented 3 years ago

I recommend that we not worry about the heterogeneous case just yet. But the answer to your question is that typically it is not ok to park GPU data on a non-GPU machine.

In my mind the next thing to do here is to test the "we can just replace a few lines of code for async communication" hypothesis, and then if that works see if we can solve the retire workers problem (which is probably the second main use case of this)

On Tue, Aug 10, 2021 at 12:09 PM crusaderky @.***> wrote:

Question for those knowledgeable about the various GPU libraries that are typically used on dask @.*** https://github.com/mrocklin please ping whoever appropriate).

If you have a hybrid cluster where some workers mount a GPU while others only have CPUs, is it ok for a rebalance operation to "park" a GPU object (e.g. a cupy array) on a CPU-only node? Or will it fail to unpickle? I think there will be different answers for different libraries and possibly different settings within the same library (e.g. with or without unified memory).

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4982#issuecomment-896160795, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTEHKBZW6MTJR4OJ4XDT4FMLRANCNFSM47MU3OHA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

quasiben commented 3 years ago

I agree with @mrocklin that worrying about the heterogenous case should not be a primary concern. @madsbk has been thinking more deeply about heterogenous computing recently and might have thoughts (though these might be better formulated in another week or two)

jakirkham commented 3 years ago

Just got back today from PTO and am catching up generally. Still need to read through this thread, but wanted to link a few potentially related issues for discussion.