dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
892 stars 255 forks source link

Error trying to deserialize an object #855

Open lbonini94 opened 3 years ago

lbonini94 commented 3 years ago
from distributed.protocol import deserialize_bytes

What happened: Error trying to deserialize an object dask_ml.decomposition.PCA already fitted.

OSError: Timed out trying to connect to tcp://10.xxx.120.x31:35487 after 30 s

I do a fit with my data and export a serialized file. When trying to instantiate on a different cluster, it appears that the file tries to connect with the scheduler of the cluster it was trained on.

What you expected to happen: I expected that, regardless of the context (cluster ip, number of workers...), I would be able to import my PCA object. I've already tried to dump using the pickle and dill libraries

deserialize_bytes('gs://file.pkl')
TomAugspurger commented 3 years ago

Can you give a reproducible example? And perhaps post the full traceback?

Just looking at the code, I don't see what attribute would be causing the issue.

lbonini94 commented 3 years ago

As I mentioned, this error raises when I try to deserialize a dask_ml.decomposition.PCA already fitted in a different cluster.

OSError: Timed out trying to connect to tcp://10.126.121.131:35487 after 30 s
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    285         try:
--> 286             comm = await asyncio.wait_for(
    287                 connector.connect(loc, deserialize=deserialize, **connection_args),

~/anaconda3/envs/leroy/lib/python3.8/asyncio/tasks.py in wait_for(fut, timeout, loop)
    500             await _cancel_and_wait(fut, loop=loop)
--> 501             raise exceptions.TimeoutError()
    502     finally:

TimeoutError: 

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
<ipython-input-7-4d0a81023b51> in <module>
----> 1 deserialize_bytes(bytes_)

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in deserialize_bytes(b)
    598         header = {}
    599     frames = decompress(header, frames)
--> 600     return merge_and_deserialize(header, frames)
    601 
    602 

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in merge_and_deserialize(header, frames, deserializers)
    472                 merged_frames.append(bytearray().join(frames[offset : offset + n]))
    473 
--> 474     return deserialize(header, merged_frames, deserializers=deserializers)
    475 
    476 

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in deserialize(header, frames, deserializers)
    404         )
    405     dumps, loads, wants_context = families[name]
--> 406     return loads(header, frames)
    407 
    408 

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in pickle_loads(header, frames)
     83         new.append(mv)
     84 
---> 85     return pickle.loads(x, buffers=new)
     86 
     87 

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/pickle.py in loads(x, buffers)
     71     try:
     72         if buffers:
---> 73             return pickle.loads(x, buffers=buffers)
     74         else:
     75             return pickle.loads(x)

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in __setstate__(self, state)
    378             c = Client.current(allow_global=False)
    379         except ValueError:
--> 380             c = get_client(address)
    381         Future.__init__(self, key, c)
    382         c._send_to_scheduler(

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/worker.py in get_client(address, timeout, resolve_address)
   3552         return client
   3553     elif address:
-> 3554         return Client(address, timeout=timeout)
   3555     else:
   3556         raise ValueError("No global client found and no address provided")

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    762             ext(self)
    763 
--> 764         self.start(timeout=timeout)
    765         Client._instances.add(self)
    766 

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in start(self, **kwargs)
   1008             self._started = asyncio.ensure_future(self._start(**kwargs))
   1009         else:
-> 1010             sync(self.loop, self._start, **kwargs)
   1011 
   1012     def __await__(self):

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

~/anaconda3/envs/leroy/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
   1098 
   1099         try:
-> 1100             await self._ensure_connected(timeout=timeout)
   1101         except (OSError, ImportError):
   1102             await self._close()

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in _ensure_connected(self, timeout)
   1155 
   1156         try:
-> 1157             comm = await connect(
   1158                 self.scheduler.address, timeout=timeout, **self.connection_args
   1159             )

~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    308             await asyncio.sleep(backoff)
    309     else:
--> 310         raise OSError(
    311             f"Timed out trying to connect to {addr} after {timeout} s"
    312         ) from active_exception

OSError: Timed out trying to connect to tcp://10.126.121.131:35487 after 30 s
lbonini94 commented 2 years ago

Update: dask_ml.decomposition.PCA().mean_ is a dask.array so, I serialized componentes_ and mean_ as np.array. Then, in another cluster I instantiate a new object and adjust these attributes. This is a workaround but apparently, it worked.

TomAugspurger commented 2 years ago

Thanks for the update. I don’t see any reason why mean_ should be a Dask array. It should probably be set to the concrete value with the rest in https://github.com/dask/dask-ml/blob/980b3cb84e65f5508004fa1cd767d2c1122bc581/dask_ml/decomposition/pca.py#L291-L304 https://github.com/dask/dask-ml/blob/980b3cb84e65f5508004fa1cd767d2c1122bc581/dask_ml/decomposition/pca.py#L291-L304.

Are you interested in making a PR with tests to fix that?

On Sep 17, 2021, at 2:43 PM, Lucas A. Bonini @.***> wrote:

Update: daskml.decomposition.PCA().mean is a dask.array so, I serialized componentes and mean as np.array. Then, in another cluster I instantiate a new object and adjust these attributes. This is a workaround but apparently, it worked.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/855#issuecomment-922038485, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOIWVGEWWEUBWZBHKGG3UCOK65ANCNFSM5DZ3XAAA.

lbonini94 commented 2 years ago

Hey @TomAugspurger, thanks for the opportunity. I will try as soon as possible, but I believe I won't be able to make the changes anytime soon.