dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Opportunistic Caching #681

Open mrocklin opened 7 years ago

mrocklin commented 7 years ago

Currently we clean up intermediate results quickly if they are not necessary for any further pending computation. This is good because it minimizes the memory footprint on the workers, often allowing us to process larger-than-distributed-memory computations.

However, this can sometimes be inefficient for interactive workloads when users submit related computations one after the other, so that the scheduler has no opportunity to plan ahead, and instead needs to recompute an intermediate result that was previously computed and garbage collected.

We could hold on to some of these results in hopes that the user will request them again. This trades active memory for potential CPU time. Ideally we would hold onto results that:

  1. Have a small memory footprint
  2. Take a long time to compute
  3. Are likely to be requested again (evidenced by recent behavior)

We did this for the single machine scheduler

We could do it in the distributed scheduler fairly easily by creating a SchedulerPlugin that watched all computations, selected computations to keep based on logic similar to what is currently in cachey, and created a fake Client to keep an active reference to those keys in the scheduler.

mrocklin commented 7 years ago

To be explicit, the mechanism to keep data on the cluster might look like this:

class CachingPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
        self.scheduler = scheduler
        self.scheduler.add_plugin(self)

    def transition(self, key, start, finish, nbytes=None, startstops=None, *args, **kwrags):
        if start == 'processing' and finish == 'memory' and should_keep(nbytes, startstops, **kwargs):
            self.scheduler.client_desires_keys(keys=[key], client='fake-caching-client')
        no_longer_desired_keys = self.cleanup()
        self.scheduler.client_releases_keys(keys=no_longer_desired_keys, client='fake-caching-client')

client.run_on_scheduler(lambda dask_scheduler: CachingPlugin(dask_scheduler)
collinwo commented 6 years ago

Thanks. I followed the above example to write an customized cache plugin for our own dask grid now. Trying to test it for some time.

IPetrik commented 5 years ago

To be explicit, the mechanism to keep data on the cluster might look like this:

class CachingPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
        self.scheduler = scheduler
        self.scheduler.add_plugin(self)

    def transition(self, key, start, finish, nbytes=None, startstops=None, *args, **kwrags):
        if start == 'processing' and finish == 'memory' and should_keep(nbytes, startstops, **kwargs):
            self.scheduler.client_desires_keys(keys=[key], client='fake-caching-client')
        no_longer_desired_keys = self.cleanup()
        self.scheduler.client_releases_keys(keys=no_longer_desired_keys, client='fake-caching-client')

client.run_on_scheduler(lambda dask_scheduler: CachingPlugin(dask_scheduler)

@mrocklin is the scheduler API explained somewhere? Can you provide more explanation of how this works? What do client_desires_keys and client_releases_keys do?

TomAugspurger commented 5 years ago

Scheduler plugins are at https://distributed.dask.org/en/latest/plugins.html and the Scheduler API is at https://distributed.dask.org/en/latest/scheduling-state.html#distributed.scheduler.Scheduler

On Thu, Jun 13, 2019 at 12:11 PM IPetrik notifications@github.com wrote:

To be explicit, the mechanism to keep data on the cluster might look like this:

class CachingPlugin(SchedulerPlugin): def init(self, scheduler): self.scheduler = scheduler self.scheduler.add_plugin(self)

def transition(self, key, start, finish, nbytes=None, startstops=None, *args, **kwrags):
    if start == 'processing' and finish == 'memory' and should_keep(nbytes, startstops, **kwargs):
        self.scheduler.client_desires_keys(keys=[key], client='fake-caching-client')
    no_longer_desired_keys = self.cleanup()
    self.scheduler.client_releases_keys(keys=no_longer_desired_keys, client='fake-caching-client')

client.run_on_scheduler(lambda dask_scheduler: CachingPlugin(dask_scheduler)

@mrocklin https://github.com/mrocklin is the scheduler API explained somewhere? Can you provide more explanation of how this works? What do client_desires_keys and client_releases_keys do?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/681?email_source=notifications&email_token=AAKAOISH2A65RLTVGQOYDYLP2J5VDA5CNFSM4CWPLB6KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXUMQQQ#issuecomment-501794882, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKAOIW2G7Z4QK2QTXCVPZDP2J5VDANCNFSM4CWPLB6A .

GenevieveBuckley commented 3 years ago

I recently talked with the ilastik team, one of the wishlist items they brought up was 2 level caching (caching to disk, or in RAM) that would work with Dask distributed.

@emilmelnikov this issue is likely the best place for discussion

kephale commented 1 year ago

@GenevieveBuckley We also care about it in napari now. With the large remote datasets (100s+ GB) we're fetching it would be great to be able to have some on-disk persistence, given that the datasets do not change and we frequently want to revisit the same dataset.

GenevieveBuckley commented 1 year ago

Have you talked to @dcherian about this, @kephale?

kephale commented 1 year ago

I will now. Thank you @GenevieveBuckley :)

jakirkham commented 1 year ago

This has been brought up in other issues (and is somewhat tangential to this issue), but would recommend looking at graphchain