dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

"distributed.protocol.core - CRITICAL - Failed to Serialize" with PyArrow 0.13 #2597

Closed johnbensnyder closed 5 years ago

johnbensnyder commented 5 years ago

Reading from Parquet is failing with PyArrow 0.13. Downgrading to PyArrow 0.12.1 seems to fix the problem. I've only encountered this when using the distributed client. Using a Dask dataframe by itself does not appear to be affected.

For example,

from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.set_index('index')

Gives

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/batched.py", line 94, in _background_send
    on_error='raise')
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 224, in write
    'recipient': self._peer_addr})
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 50, in to_frames
    res = yield offload(_to_frames)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 43, in _to_frames
    context=context))
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])

Similarly,

from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.compute()

Causes this error

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

AttributeError: Can't pickle local object 'ParquetDataset._get_open_file_func.<locals>.open_file'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-3-a90291ebde1e> in <module>
      1 ft_inp_ddf = dd.read_parquet('DATA/ft_14_15_inputs_parquet/', engine = 'pyarrow')
----> 2 ft_inp_ddf.compute()

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2318             retries=retries,
   2319             user_priority=priority,
-> 2320             actors=actors,
   2321         )
   2322         packed = pack_data(keys, futures)

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2259 
   2260             self._send_to_scheduler({'op': 'update-graph',
-> 2261                                      'tasks': valmap(dumps_task, dsk3),
   2262                                      'dependencies': dependencies,
   2263                                      'keys': list(flatkeys),

/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/opt/conda/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
   2769         elif not any(map(_maybe_complex, task[1:])):
   2770             return {'function': dumps_function(task[0]),
-> 2771                     'args': warn_dumps(task[1:])}
   2772     return to_serialize(task)
   2773 

/opt/conda/lib/python3.6/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
   2778 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
   2779     """ Dump an object to bytes, warn if those bytes are large """
-> 2780     b = dumps(obj)
   2781     if not _warn_dumps_warned[0] and len(b) > limit:
   2782         _warn_dumps_warned[0] = True

/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    959     try:
    960         cp = CloudPickler(file, protocol=protocol)
--> 961         cp.dump(obj)
    962         return file.getvalue()
    963     finally:

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    265         self.inject_addons()
    266         try:
--> 267             return Pickler.dump(self, obj)
    268         except RuntimeError as e:
    269             if 'recursion' in e.args[0]:

/opt/conda/lib/python3.6/pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_tuple(self, obj)
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    398             # func is nested
    399             if lookedup_by_name is None or lookedup_by_name is not obj:
--> 400                 self.save_function_tuple(obj)
    401                 return
    402 

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_list(self, obj)
    779 
    780         self.memoize(obj)
--> 781         self._batch_appends(obj)
    782 
    783     dispatch[list] = save_list

/opt/conda/lib/python3.6/pickle.py in _batch_appends(self, items)
    806                 write(APPENDS)
    807             elif n:
--> 808                 save(tmp[0])
    809                 write(APPEND)
    810             # else tmp is empty, and we're done

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

/opt/conda/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-x86_64-linux-gnu.so in pyarrow._parquet.ParquetSchema.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__
gcoimbra commented 5 years ago

@martindurant

It seems that I was causing the problem. The problem doesn't seems to be with PyArrow or Fastparquet. Because it happens in when I try to read a csv with dask.dataframe.read_csv using usecols optional argument passing a dict_keys object instead of a list. Then, dask tries to serialize the following object (that can be seen in my previous post) is created:

['usecols', dict_keys(...)]

Removing the usecols or using usecols=list() argument fixes the problem. I'm sorry for the trouble.

Do you want me to try to fix the problem and submit a pull request?

martindurant commented 5 years ago

I'm not sure there's anything to fix: the (pandas) docstring says that usecols must be list-like.

martindurant commented 5 years ago

In any case, I'll close this, since we now know what's going on.

bhishanpdl commented 4 years ago

I got the same error in Jul 10, 2020 MacPro 2017 macos catalina with miniconda env

How to solve the error? Is this because I have a single laptop and for distributed we need a cluster of multiple computers?

# imports
import numpy as np
import pandas as pd

import dask
import dask.dataframe as dd
import dask.array as da
import dask_ml
import pyarrow

print([(x.__name__,x.__version__) for x in 
       [np,pd, dask, dask_ml,pyarrow]])
[('numpy', '1.18.5'), ('pandas', '1.0.5'), ('dask', '2.20.0'), ('dask_ml', '1.5.0'), ('pyarrow', '0.17.1')]

# data
a = da.random.normal(size=(2000, 2000), chunks=(1000, 1000)) # data
res = a.dot(a.T).mean(axis=0) # operation
res = res.persist()  # start computation in the background

# code
from dask.distributed import Client, progress

client = Client()  # use dask.distributed by default
progress(res)      # watch progress
res.compute()      # convert to final result when done if desired

Error

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
    ret = unpacker._unpack()
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
    "%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
distributed.core - ERROR - <class 'tuple'> is not allowed for map key
Traceback (most recent call last):
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 456, in handle_stream
    msgs = await comm.read()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/tcp.py", line 212, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 69, in from_frames
    res = _from_frames()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 55, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
    ret = unpacker._unpack()
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
    "%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
distributed.core - ERROR - <class 'tuple'> is not allowed for map key
Traceback (most recent call last):
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/scheduler.py", line 2491, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 456, in handle_stream
    msgs = await comm.read()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/tcp.py", line 212, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 69, in from_frames
    res = _from_frames()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 55, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
    ret = unpacker._unpack()
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
    "%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-7-4d5bc96d9bb1> in <module>
      1 progress(res)      # watch progress
      2 
----> 3 res.compute()      # convert to final result when done if desired

~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443 
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446 

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2590                     should_rejoin = False
   2591             try:
-> 2592                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2593             finally:
   2594                 for f in futures.values():

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1888                 direct=direct,
   1889                 local_worker=local_worker,
-> 1890                 asynchronous=asynchronous,
   1891             )
   1892 

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    766         else:
    767             return sync(
--> 768                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    769             )
    770 

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    343     if error[0]:
    344         typ, exc, tb = error[0]
--> 345         raise exc.with_traceback(tb)
    346     else:
    347         return result[0]

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/utils.py in f()
    327             if callback_timeout is not None:
    328                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 329             result[0] = yield future
    330         except Exception as exc:
    331             error[0] = sys.exc_info()

~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

CancelledError: 
gcoimbra commented 4 years ago

I'm not sure there's anything to fix: the (pandas) docstring says that usecols must be list-like.

Can't we warn users with a ValueError exception?

martindurant commented 4 years ago

I don't think we're generally in a position to check the types of arguments that we pass on to other functions.