dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

client.submit not working with client.upload_file after first time #3739

Open huseinzol05 opened 4 years ago

huseinzol05 commented 4 years ago

Below are my local and cloud Dask version.

Dask version: 2.14 Distributed version: 2.14

We have a production Dask cluster in GKE, and shared among our engineers. Now, we want to create a python script for boilerplate purpose, simply name, boilerplate.py,

def inc(x):
    if x > 10:
        raise Exception('x > 10')
    return x + 1

And our client script,

from dask.distributed import Client
import boilerplate as bp
bootstrap = # proxied
client = Client(bootstrap)
client.upload_file('boilerplate.py')

client.submit(bp.inc,7).result() # no error
client.submit(bp.inc,11).result() # got error

Now we fixed boilerplate.py,

def inc(x):
    if x > 20:
        raise Exception('x > 20')
    return x + 1

And rerun client script to reupload fixed boilerplate.py,

from dask.distributed import Client
Exception: x > 10
bootstrap = # proxied
client = Client(bootstrap)
client.upload_file('boilerplate.py')

client.submit(bp.inc,7).result() # no error
client.submit(bp.inc,11).result() # got error

It is still same exception,

Exception: x > 10

The problem here, we cannot restart our Dask cluster, it might disturb other important tasks. It might seems got some caching layer here, this is only happened if we use file_upload + submit, if we use file_upload + run, it use latest update boilerplate.py.

TomAugspurger commented 4 years ago

What happens if you include a client.run(importlib.reload(boilerplate) to reload the module? Reloading python code at runtime is always fraught (what do you do with existing classes?) but may work here.

TomAugspurger commented 4 years ago

You might also need to manually clear some data off the workers. I don' know if the function gets cached in the worker's .data attribute.

huseinzol05 commented 4 years ago

Tried,

import boilerplate as bp
def reload_package(package):
    import sys
    import importlib
    importlib.reload(sys.modules[package])
    return list(sys.modules.keys())

r = client.run(reload_package, 'boilerplate')
import boilerplate

x = client.submit(boilerplate.inc, 21)
x.result() # error

Some of workers reloaded, some are not, weird.

TomAugspurger commented 4 years ago

Let us know if you're able to do more debugging.

Given the complexities here, I don't really see how Dask can support re-uploading modules with upload_file. I'd recommend investigating alternative ways of doing this.

We can reopen if you find a specific issue or a solution that works well generally.

huseinzol05 commented 4 years ago

We found it the issue,

https://github.com/dask/distributed/blob/master/distributed/worker.py#L3268,

def loads_function(bytes_object):
    """ Load a function from bytes, cache bytes """
    if len(bytes_object) < 100000:
        try:
            result = cache_loads[bytes_object]
        except KeyError:
            result = pickle.loads(bytes_object)
            cache_loads[bytes_object] = result
        return result
    return pickle.loads(bytes_object)

Even when we uploaded new boilerplate.py, cloudpickle always dumped bp.inc into same bytes,

import cloudpickle

cloudpickle.loads(cloudpickle.dumps(bp.inc))
# <function boilerplate.inc(x)>

even though inside boilerplate.inc already reloaded, but not for LRU cache, so because cloudpickle always dumped same bytes, it will always pickup old function boilerplate.inc(x).

We tried,

cache_loads = LRU(maxsize = 100)
enable_caching = False

def loads_function(bytes_object):
    """ Load a function from bytes, cache bytes """
    logging.debug('enter loads_function')
    if len(bytes_object) < 100_000 and enable_caching:
        try:
            result = cache_loads[bytes_object]
        except KeyError:
            result = pickle.loads(bytes_object)
            cache_loads[bytes_object] = result
        return result
    return pickle.loads(bytes_object)

Simulate a Dask cluster,

dask-scheduler --host localhost
dask-worker localhost:8786

Inside boilerplate.py,

def inc(x):
    if x > 10:
        raise Exception('x > 10')
    return x + 1

And then in main,

client = Client('localhost:8786')

import boilerplate as bp
client.upload_file('boilerplate.py')
r = client.submit(bp.inc,10) # no error
r = client.submit(bp.inc,11) # error

Now change boilerplate.py without need to shutdown scheduler and worker,

def inc(x):
    if x > 20:
        raise Exception('x > 20')
    return x + 1

now in main,

def reload_package(package):
    import sys
    import importlib

    try:

        result = importlib.reload(sys.modules[package])
    except:
        result = None

    return (list(sys.modules.keys()), result)

reload_package('boilerplate')
client.upload_file('boilerplate.py')
r = client.run(reload_package, 'boilerplate')
r = client.submit(bp.inc,11, pure = False) # no error

We should define an alias in https://github.com/dask/distributed/config.py to disable LRU caching programmatically.

huseinzol05 commented 4 years ago

@TomAugspurger

7Eric9 commented 1 year ago

I got the same issue in version 2023.9.2 when i re-upload_file to all distributed nodes, new version codes can not be excuted, is there any word around ? I got some clue on https://github.com/dask/distributed/pull/3993 but cache_loads.data.clear() not work anylonger