dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

"can't pickle thread.lock objects" when calling array.store with distributed #780

Open rabernat opened 7 years ago

rabernat commented 7 years ago

I am trying to store a dask array using distributed. When I call store, I get an error "can't pickle thread.lock objects".

I originally was trying this in a much more complex context involving netCDF, xarray, etc. But I managed to come up with the following minimal example.

import numpy as np
import dask.array as da
from distributed import Client

def create_and_store_dask_array():
    shape = (10000, 1000)
    chunks = (1000, 1000)
    data = da.zeros(shape, chunks=chunks)
    store = np.memmap('test.memmap', mode='w+', dtype=data.dtype, shape=data.shape)
    data.store(store)
    print("Success!")

create_and_store_dask_array()
client = Client()
create_and_store_dask_array()

The first call works, but the second fails. The output is:

Success!
/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/pickle.pyc - INFO - Failed to serialize (<function store at 0x7f0ee802f488>, (<functools.partial object at 0x7f0ec84f1418>, (1000, 1000)), (slice(2000, 3000, None), slice(0, 1000, None)), <thread.lock object at 0x7f0f2c715af0>)
Traceback (most recent call last):
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/pickle.py", line 43, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 706, in dumps
    cp.dump(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 146, in dump
    return Pickler.dump(self, obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/core.pyc - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/core.py", line 43, in dumps
    for key, value in data.items()
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/core.py", line 44, in <dictcomp>
    if type(value) is Serialize}
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/serialize.py", line 106, in serialize
    header, frames = {}, [pickle.dumps(x)]
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/pickle.py", line 43, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 706, in dumps
    cp.dump(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 146, in dump
    return Pickler.dump(self, obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects

Versions:

import dask
print dask.__version__
import distributed
print distributed.__version__
>>> 0.12.0
>>> 1.14.3
rabernat commented 7 years ago

Seems related to dask/dask#1683.

mrocklin commented 7 years ago

Can you try the following

# data.store(store)

from dask.utils import SerializableLock
lock = SerializableLock()
data.store(store, lock=lock)
rabernat commented 7 years ago

It works with SerializableLock. Thanks @mrocklin!

Is there a way to make this the default?

rabernat commented 7 years ago

p.s. I had to update to dask. 0.13.0 in order to be able to import the SerializableLock.

mrocklin commented 7 years ago

We could make it the default, yes. I suppose it doesn't have much overhead over standard locks.

mrocklin commented 7 years ago

This just came in. You could ping there and see if he's interested in implementing this as well: https://github.com/dask/knit/issues/60

rabernat commented 7 years ago

So with SerializableLock it works at the dask level. But now I would like to wrap this array in an xarray Dataset and call to_netcdf. Unfortunately that function does not allow me to pass a custom lock. It appears that calls to dask.array.store automatically get passed a threading.Lock() object: https://github.com/pydata/xarray/blob/master/xarray/backends/common.py#L153

Should I raise this issue over at xarray then?

(I must admit I don't see the relevance of dask/knit#60 to this issue.)

mrocklin commented 7 years ago

Sorry, wrong copy-paste: https://github.com/dask/dask/pull/1879

mrocklin commented 7 years ago

Or you could also implement this yourself. Should be an easy change and it'd be good to have your name in the authors list :) (or I can do it, but that's boring)

mrocklin commented 7 years ago

We built SerializableLock with XArray in mind. I think that they started using it as well in some cases.

cc @shoyer

rabernat commented 7 years ago

Ah, it looks like there is already an xarray PR in progress that would address this: pydata/xarray#1179

pelson commented 7 years ago

Just ran into this too. Is it feasible to extend cloudpickle to dispatch thread.lock, rather than retro-fitting SerializableLock usage?

mrocklin commented 7 years ago

I think that cloudpickle should probably still err when trying to serialize a lock. This is usually an unsafe thing to do I think.

mrocklin commented 7 years ago

@pelson would the solution in either of the two PRs suffice for you? I'm happy to spend some time to make this workable.

pelson commented 7 years ago

As it happens, I'm not sure it was exactly the same issue (as in, line of code within distributed). I added a print statement into cloudpickle (around L604) at along the lines of:

        if state is not None:
            print('STATE:', state)
            save(state)
            write(pickle.BUILD)

And was able to determine it was a completely different thread lock I had hold of...

STATE: {'name': 'mds-logger', 'parent': <logging.RootLogger object at 0x7f4e15e50310>, 'handlers': [], 'level': 0, 'disabled': 0, 'manager': <logging.Manager object at 0x7f4e15e50190>, 'propagate': 1, 'filters': []}
STATE: {'name': 'root', 'parent': None, 'handlers': [<logging.StreamHandler object at 0x7f4defcbd090>], 'level': 20, 'disabled': 0, 'propagate': 1, 'filters': []}
STATE: {'stream': <open file '<stderr>', mode 'w' at 0x7f4e15fa91e0>, 'level': 0, 'lock': <_RLock owner=None count=0>, '_name': None, 'filters': [], 'formatter': <logging.Formatter object at 0x7f4defcbd1d0>}
STATE: {'_Verbose__verbose': False, '_RLock__owner': None, '_RLock__block': <thread.lock object at 0x7f4df056e8b0>, '_RLock__count': 0}

I got rid of my global logger and the issue disappeared. This is really a cloudpickle issue, not distributed. For completeness, I have now tracked this down to the following:

>>> from cloudpickle import dumps
>>> import sys

>>> dumps(sys.stdout)
... fine

>>> dumps(sys.stderr)
... fine

>>> import logging
>>> logger = logging.getLogger('test')
>>> dumps(logger)
... fine

>>> logger.info('test')
>>> dumps(logger)
... fine

>>> logger.addHandler(logging.StreamHandler())
>>> dumps(logger)
...
TypeError: can't pickle thread.lock objects

:bomb:

Sorry for the noise on this issue. Happy to move this over to cloudpickle if it is helpful?

jakirkham commented 7 years ago

Also saw similar issues. FWIW dill seems to have no problem pickling locks from threading, but cloudpickle does ( https://github.com/cloudpipe/cloudpickle/issues/81 ). Though I'm unclear on whether one should ever be serializing locks from threading instead of using a lock designed to be shared between processes for instance.

mrocklin commented 7 years ago

@rabernat is your original issue resolved?

rabernat commented 7 years ago

I believe it is not resolved because dask/dask#1881 has not been merged yet. It is awaiting a test which I have not had time to figure out how to write.

returncode13 commented 5 years ago

hi. @mrocklin is there an update on this issue? Are we still to use the SerializableLock?

mrocklin commented 5 years ago

I'm not sure that there is anything we can do here. In principle SerializableLock assumes that the place you're storing to can be written to from separate processes without coordination. That might not be true.

It would be good to improve error reporting, to point users to this option if it's right for them. I don't know of a good way of doing that though. If anyone has suggestions on how to improve this that would be helpful.

On Thu, Apr 25, 2019 at 6:30 AM sharath nair notifications@github.com wrote:

hi. @mrocklin https://github.com/mrocklin is there an update on this issue? Are we still to use the SerializableLock?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/780#issuecomment-486634598, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTFQYOZKQ3WOBTDRY43PSGI4VANCNFSM4C3HBJIQ .