dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Random KeyError during long computations #2372

Closed rainwoodman closed 5 years ago

rainwoodman commented 5 years ago

I see random KeyErrors from cache.py. This is a bit difficult to isolate.

Here is my current set up:

Here is the error stack trace from the errored Future object. The key is not found in starttimes. This happens very infrequently. I have 747 successful tasks and only 16 failed tasks, doing almost the same thing. There is no obvious pattern whether the task is reading more data or less, whether the worker is stressed by memory load or cpu load.

I wonder if this has something to do with syncing keys in cache.py and data fetched from another worker?

~/.conda/envs/bccp/bin/dask-worker in read_slice()
     15         if 'Density' in cat.columns:
     16             pos, den = cat.compute(cat['Position'][start:end],
---> 17                                    cat['Density'][start:end])
     18         else:
     19             pos = cat.compute(cat['Position'][start:end])

~/.conda/envs/bccp/lib/python3.6/site-packages/nbodykit/base/catalog.py in compute()
    549         # compute using global cache
    550         with GlobalCache.get():
--> 551             toret = dask.compute(*args, **kwargs)
    552 
    553         # do not return tuples of length one

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/base.py in compute()
    390     keys = [x.__dask_keys__() for x in collections]
    391     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 392     results = schedule(dsk, keys, **kwargs)
    393     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    394 

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/local.py in get_sync()
    540     """
    541     kwargs.pop('num_workers', None)    # if num_workers present, remove it
--> 542     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    543 
    544 

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/local.py in get_async()
    504                 finish_task(dsk, key, state, results, keyorder.get)
    505                 for f in posttask_cbs:
--> 506                     f(key, res, dsk, state, worker_id)
    507 
    508                 while state['ready'] and len(state['running']) < num_workers:

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/cache.py in _posttask()
     54 
     55     def _posttask(self, key, value, dsk, state, id):
---> 56         duration = default_timer() - self.starttimes[key]
     57         deps = state['dependencies'][key]
     58         if deps:

KeyError: ('array-ca15c22e01e3904ee936f8ab1a09a98a', 13062, 0)
mrocklin commented 5 years ago

If you're able to provide a minimal reproducible example that would help identify what the cause is.

On Sun, Nov 25, 2018 at 11:49 PM Yu Feng notifications@github.com wrote:

I see random KeyErrors from cache.py. This is a bit difficult to isolate.

Here is my current set up:

  • A distributed cluster on 56 processes, each containing 2 threads.
  • dask scheduler is set to distributed.
  • Submitting dask tasks from tasks with two levels of nesting. The inner most level uses dask.compute. Others use worker_client() context manager during submission and reaping.
  • dask version 0.20.0
  • distributed version 1.24.0

Here is the error stack trace from the errored Future object. The key is not found in starttimes. This happens very infrequently. I have 747 successful tasks and only 16 failed tasks, doing almost the same thing. There is no obvious pattern whether the task is reading more data or less, whether the worker is stressed by memory load or cpu load.

I wonder if this has something to do with syncing keys in cache.py and data fetched from another worker?

~/.conda/envs/bccp/bin/dask-worker in read_slice() 15 if 'Density' in cat.columns: 16 pos, den = cat.compute(cat['Position'][start:end], ---> 17 cat['Density'][start:end]) 18 else: 19 pos = cat.compute(cat['Position'][start:end])

~/.conda/envs/bccp/lib/python3.6/site-packages/nbodykit/base/catalog.py in compute() 549 # compute using global cache 550 with GlobalCache.get(): --> 551 toret = dask.compute(*args, **kwargs) 552 553 # do not return tuples of length one

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/base.py in compute() 390 keys = [x.dask_keys() for x in collections] 391 postcomputes = [x.dask_postcompute() for x in collections] --> 392 results = schedule(dsk, keys, *kwargs) 393 return repack([f(r, a) for r, (f, a) in zip(results, postcomputes)]) 394

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/local.py in get_sync() 540 """ 541 kwargs.pop('num_workers', None) # if num_workers present, remove it --> 542 return get_async(apply_sync, 1, dsk, keys, **kwargs) 543 544

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/local.py in get_async() 504 finish_task(dsk, key, state, results, keyorder.get) 505 for f in posttask_cbs: --> 506 f(key, res, dsk, state, worker_id) 507 508 while state['ready'] and len(state['running']) < num_workers:

~/.conda/envs/bccp/lib/python3.6/site-packages/dask/cache.py in _posttask() 54 55 def _posttask(self, key, value, dsk, state, id): ---> 56 duration = default_timer() - self.starttimes[key] 57 deps = state['dependencies'][key] 58 if deps:

KeyError: ('array-ca15c22e01e3904ee936f8ab1a09a98a', 13062, 0)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2372, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszKiHT67L0rw3Y-ZlJRT857B5817Qks5uy3L0gaJpZM4YyUcx .

rainwoodman commented 5 years ago

I was able to narrow this with some reasoning:

    550         with GlobalCache.get():
--> 551             toret = dask.compute(*args, **kwargs)
    552 

GlobalCache.get() creates a new cache object from dask.cache.Cache():

class GlobalCache(object):
    """
    A class to faciliate calculation using a global cache via
    :class:`dask.cache.Cache`.
    """
    _instance = None

    @classmethod
    def get(cls):
        """
        Return the global cache object. The default size is controlled
        by the ``global_cache_size`` global option; see :class:`set_options`.

        Returns
        -------
        cache : :class:`dask.cache.Cache`
            the cache object, as provided by dask
        """
        # if not created, use default cache size
        if not cls._instance:
            from dask.cache import Cache
            cls._instance = Cache(_global_options['global_cache_size'])

        return cls._instance

It appears the context manager updates a on TLS global variable for the current cache? In multi-thread, cache keys would be missing during one thread's compute call due to the global cache being swapped out by another thread.

Is there thread friendly way to control the cache object other than the context manager way? dask.compute doesn't take a cache object.

rainwoodman commented 5 years ago

Orthogonal to the race-condition: it's a bit fishy we are using our own cache object at all; probably just to control the memory usage of the cache. Is there a dask config key that we can use?

mrocklin commented 5 years ago

I'm still not entirely sure how it is that you're using the opportunistic cache object with the distributed scheduler. I'm not sure I know enough to properly comment here.

rainwoodman commented 5 years ago

I think this should crash without using a distributed scheduler. A minimal test case would be (I will try this):

def thread_main():
     cache = Cache()
     a = da.zeros(100000, chunks=100)
     with cache:
           dask.compute(a, scheduler='synchronous')

Then run this with many many threads for a long time. It may be we need a more complicated graph for this to show up more frequently.

mrocklin commented 5 years ago

I see. Your concern is that the local scheduler's callback mechanisms aren't threadsafe. Is that correct?

If so then yes, I don't think that any attempt was ever made to ensure this.

On Mon, Nov 26, 2018 at 1:30 PM Yu Feng notifications@github.com wrote:

I think this should crash without using a distributed scheduler. A minimal test case would be (I will try this):

def thread_main(): cache = Cache() a = da.zeros(100000, chunks=100) with cache: dask.compute(a, scheduler='synchronous')

Then run this with many many threads for a long time. It may be we need a more complicated graph for this to show up more frequently.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2372#issuecomment-441746486, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszMNtx2HjOCxDcZdzJt5Cw29NrzxBks5uzDM6gaJpZM4YyUcx .

mrocklin commented 5 years ago

I'm curious, what is your broader objective? Is your goal to get opportunistic caching for the distributed scheduler? If so then there are probably other ways to approach this problem. https://github.com/dask/distributed/issues/681

On Mon, Nov 26, 2018 at 1:33 PM Matthew Rocklin mrocklin@gmail.com wrote:

I see. Your concern is that the local scheduler's callback mechanisms aren't threadsafe. Is that correct?

If so then yes, I don't think that any attempt was ever made to ensure this.

On Mon, Nov 26, 2018 at 1:30 PM Yu Feng notifications@github.com wrote:

I think this should crash without using a distributed scheduler. A minimal test case would be (I will try this):

def thread_main(): cache = Cache() a = da.zeros(100000, chunks=100) with cache: dask.compute(a, scheduler='synchronous')

Then run this with many many threads for a long time. It may be we need a more complicated graph for this to show up more frequently.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2372#issuecomment-441746486, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszMNtx2HjOCxDcZdzJt5Cw29NrzxBks5uzDM6gaJpZM4YyUcx .

rainwoodman commented 5 years ago

Our code (nbodykit) was written with the single-threaded synchronous scheduler in mind, but I am exploring expanding this with the distributed scheduler. We have a piece of simulation post processing pipeline that we wanted to run with dask on a few nodes. (around one hundred processes).

This issue was triggered by our mechanism to selectively enable our oppr-cache. I don't see it very useful, and now I am making sure the cache is always enabled to work around the issue:

https://github.com/bccp/nbodykit/commit/324a07118a259d1b566874b89ffda813d1183373

(I suppose cachey itself has to be thread safe)

What is the current expected behavior if I use the distributed scheduler and register a oppr-cache object? I am a concerned that your comments suggest the distributed scheduler shouldn't be talking to any registered oppr-cache object at all.

mrocklin commented 5 years ago

The opportunistic cache in the dask/dask codebase uses the callback mechanism designed for the local scheduler. This callback mechanism is only useful for the local scheduler and is not supported for the distributed scheduler, which operates entirely differently.

On Mon, Nov 26, 2018 at 1:46 PM Yu Feng notifications@github.com wrote:

Our code (nbodykit) was written with the single-threaded synchronous scheduler in mind, but I am exploring expanding this with the distributed scheduler. We have a piece of simulation post processing pipeline that we wanted to run with dask on a few nodes. (around one hundred processes).

This issue was triggered by our mechanism to selectively enable our oppr-cache. I don't see it very useful, and now I am making sure the cache is always enabled to work around the issue:

bccp/nbodykit@324a071 https://github.com/bccp/nbodykit/commit/324a07118a259d1b566874b89ffda813d1183373

(I suppose cachey itself has to be thread safe)

What is the current expected behavior if I use the distributed scheduler and register a oppr-cache object? I am a concerned that your comments suggest the distributed scheduler shouldn't be talking to any registered oppr-cache object at all.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2372#issuecomment-441751908, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszAgxAEMmnpA9PJjbR1uxe4BB2EnKks5uzDcXgaJpZM4YyUcx .

rainwoodman commented 5 years ago

Thanks for the explanation. This is to say if I do not force the compute to use synchronous scheduler then there shouldn't be an issue. I will try it out.

I can confirm after not grabbing the cache context manager I am no longer seeing the errors. So the KeyError is due to thread unsafety of the oppr-cache context manager. To fix or to document?

A fix would probably mean adding an argument to dask.compute (and propagate to related functions all the way to the triggering of the hooks).

mrocklin commented 5 years ago

I'm inclined to not fix it until it's more clearly useful. I'm not sure though. It would probably depend on how expensive it was to fix and maintain.

On Mon, Nov 26, 2018 at 2:12 PM Yu Feng notifications@github.com wrote:

Thanks for the explanation. This is to say if I do not force the compute to use synchronous scheduler then there shouldn't be an issue. I will try it out.

I can confirm after not grabbing the cache context manager I am no longer seeing the errors. So the KeyError is due to thread unsafety of the oppr-cache context manager. To fix or to document?

A fix would probably mean adding an argument to dask.compute (and propagate to related functions all the way to the triggering of the hooks).

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2372#issuecomment-441760401, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszBnX8dTbjBUZX97P1FTEuECv4Gyhks5uzD0GgaJpZM4YyUcx .

rainwoodman commented 5 years ago

It is rare (and poor design) overriding the oppr-cache from multiple threads, so far as my case shows.

I think the calculation weights towards waiting for more reports before diving in for a fix. I'll close this ticket. Thanks!