dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 717 forks source link

[Discussion] What if broken Client connections didn't release Futures at all? #5684

Open gjoseph92 opened 2 years ago

gjoseph92 commented 2 years ago

@graingert and I were discussing https://github.com/dask/distributed/issues/5667, and he raised a good question: why should we interpret a broken connection to a Client as an indicator that the Client no longer cares about its futures?

Currently, when the connection to a client breaks, the scheduler releases all its futures. This is clearly too aggressive—small network blips can cause you to lose all your work (https://github.com/dask/distributed/issues/5667). There's been discussion around adding a "grace period", where disconnected clients have some amount of time to reconnect before we release their futures.

But what if we went to the other extreme, and said that unless a client explicitly tells us it's closing, we assume it still cares about its futures, regardless of whether it's around to see them? And whenever it reconnects (if it does), all state updates are synced?

This would of course solve #5667, but also would open the door to many other use-cases, which right now you'd have to solve with other libraries or Jupyter notebooks running on a server:

I have a feeling that the current client-disconnect-releases-Futures paradigm makes sense in an environment where many users are sharing one cluster. If someone disconnects, it's more neighborly to not let their Futures hang around forever. But I'm not sure whether multi-user clusters are really a common use-case of distributed anymore? With so many tools to create clusters on-demand (dask-cloudprovider, dask-gateway, pangeo, Coiled, Saturn Cloud, etc.), ephemeral, single-user clusters seem to me like the more typical pattern.

And when you have an ephemeral, single-user cluster, it's probably fine/a good thing if your futures stick around unless explicitly dropped:

Certainly, the implementation of this versus a grace period can be effectively the same (especially if you can say grace-period: inf). But maybe rethinking the semantics of disconnection vs the implementation will be helpful as we figure out what to do here.

cc @fjetter @jcrist @crusaderky @jrbourbeau

fjetter commented 2 years ago

But what if we went to the other extreme, and said that unless a client explicitly tells us it's closing, we assume it still cares about its futures, regardless of whether it's around to see them? And whenever it reconnects (if it does), all state updates are synced?

I consider this to be a duplicate of https://github.com/dask/distributed/issues/5667 with the difference that your proposal would never forget anything and I do not consider this a viable option. Not because of a multi-user setup but mostly because I do not know how we would ever reconstruct the state on client side. Sure, we can re-register the futures by key but most dask users are not interacting with dask futures but rather dataframes, arrays, etc. For me, allowing an automatic stateful reconnect only makes sense if the same python process, the same client instance is still around.

We do not clean up any state because of a connection failure (that's the technical cause) but because we think the remote is unexpectedly dead. I'm open to introducing an API which allows us to do this deliberately (and requires user interaction).

class Client
    def suspend_connection(self) -> Client.id:
        pass
    def resume_connection(self, id) -> dict[key, Future]:
        pass

We do support this already but in a very obscure way via Variables

# Session A
fut = client.submit(f)
var = Variable("my-future")
var.set(fut)
# End Session A / Close client  / close laptop

# Session B - New Client
var = Variable("my-future")
fut = var.get()
fut.result()

and I think a more straight forward way for plain futures would be valuable. However, this would only serve a relatively small subset of users. For the generic case, the reconnect is the most viable option.

Below a few thoughts about the use cases you mentioned

You live in a rural area with a bad ISP; after submitting a large job to a cluster, your connection goes down for 30min

I don't think this is the user base we should optimize for. At least not if it requires significant changes or even more code to maintain. If a client process can reconnect statefully after a grace period, this case would be covered. Set it to 1h and you're good (assuming the above case of same-process/client reconnect)

You submit some work from your laptop at a café, but they're closing early. You'd like to go to a different café, but if you close your laptop, your tasks will be cancelled. Too late to learn how to change some client-disconnect-grace-period on the fly on the scheduler! You want to submit a large job on Friday afternoon that ends in a to_parquet writing to S3. You don't need the results of the tasks; you're just running them for their side effects. You want to fire-and-forget the tasks without having to learn about fire_and_forget because it's a Friday afternoon and nobody wants googling and reading documentation to be the thing between them and their weekend.

Both of these cases make the case that some event happens and a user lacks education about how dask works and has insufficient time to find out. I agree that this is a big factor for UX but I am not convinced that these two cases are representative. Also, since this is partially about education, that might be a documentation problem as well.

crusaderky commented 2 years ago

You submit some work from your laptop at a café, but they're closing early. You'd like to go to a different café, but if you close your laptop, your tasks will be cancelled. Too late to learn how to change some client-disconnect-grace-period on the fly on the scheduler!

This is one of the reasons why published datasets exist. https://distributed.dask.org/en/stable/publish.html

While in theory I would feel mildly warm at the idea of extending the publish/unpublish design to the whole state of a Client, there's the important detail that a client doesn't always track collections. Namely, it does when you invoke compute(), but it doesn't when you invoke persist() (the client will only keep track of the individual keys). I'm unaware of a one-button "pickle session" for jupyter, but I may have missed.

gjoseph92 commented 2 years ago

For me, allowing an automatic stateful reconnect only makes sense if the same python process, the same client instance is still around.

I'm not proposing otherwise. In our original discussion, @graingert actually suggested that the trigger for forgetting a disconnected client's futures could just be that a new client instance had connected, which would be very reasonable in single-user clusters. I just simplified that idea here.

I'm in no way suggesting that we support reconnecting across different Python sessions. That's both much less useful IMO, and way harder.

I'm just saying, why should the client-reconnect grace period be only temporal? Why should time be the deciding factor, if the scheduler already has an idle timeout? For single-user clusters, I don't think time-since-disconnect is a good indicator of whether users want their futures to be cancelled or not, when there are better indicators of that intention:

Of course, we'd still need to support setting a temporal grace period for multi-user clusters / clusters that are the backend for a web API / etc.

Basically, I'm proposing

Client(..., reconnect_grace_period: str | Literal["inf"] = "1min", remove_orphaned_tasks: bool = False)

right now, with the defaults changing to the more user-friendly

Client(..., reconnect_grace_period: str | Literal["inf"] = "inf", remove_orphaned_tasks: bool = True)

after a deprecation period, to avoid breaking changes for multi-client users.

remove_orphaned_tasks would be sent to the scheduler upon connection. If True, the scheduler would release all keys held by clients that were not currently connected.