blaze / castra

Partitioned storage system based on blosc. **No longer actively maintained.**
BSD 3-Clause "New" or "Revised" License
153 stars 21 forks source link

castra 'TypeError: data type not understood' when storing categoricals #56

Open user32000 opened 8 years ago

user32000 commented 8 years ago
dd = df.from_pandas(d, npartitions=2)

dd.dtypes
Out[49]: 
a             int64
b    datetime64[ns]
c            object
dtype: object

c = dd.to_castra('delme0.castra')

c = None

c = dd.to_castra('delme1.castra', categories=True)

c
Out[54]: <castra.core.Castra at 0x10983eb00>

ee = df.from_castra('delme1.castra')

ee
Out[56]: dd.DataFrame<from-castra-7c5f3b6d9b74449a9e27408736e8859a, divisions=(0, 4, 9)>

ee.dtypes
Out[57]: 
a             int64
b    datetime64[ns]
c          category
dtype: object

c = ee.to_castra('delme2.castra')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-58-62c09f024c21> in <module>()
----> 1 c = ee.to_castra('delme2.castra')

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/core.py in to_castra(self, fn, categories, sorted_index_column, compute)
   1409         from .io import to_castra
   1410         return to_castra(self, fn, categories, sorted_index_column,
-> 1411                          compute=compute)
   1412 
   1413     def to_bag(self, index=False):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/io.py in to_castra(df, fn, categories, sorted_index_column, compute)
    769     keys = [(name, -1), (name, df.npartitions - 1)]
    770     if compute:
--> 771         c, _ = DataFrame._get(dsk, keys, get=get_sync)
    772         return c
    773     else:

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in _get(cls, dsk, keys, get, **kwargs)
     41         get = get or _globals['get'] or cls._default_get
     42         dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43         return get(dsk2, keys, **kwargs)
     44 
     45     @classmethod

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_sync(dsk, keys, **kwargs)
    514     queue = Queue()
    515     return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 516                      raise_on_exception=True, **kwargs)
    517 
    518 

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    485             f(key, res, dsk, state, worker_id)
    486         while state['ready'] and len(state['running']) < num_workers:
--> 487             fire_task()
    488 
    489     # Final reporting

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in fire_task()
    456         # Submit
    457         apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458                                         get_id, raise_on_exception])
    459 
    460     # Seed initial tasks into the thread pool

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in apply_sync(func, args, kwds)
    506 def apply_sync(func, args=(), kwds={}):
    507     """ A naive synchronous version of apply_async """
--> 508     return func(*args, **kwds)
    509 
    510 

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
    262     """
    263     try:
--> 264         result = _execute_task(task, data)
    265         id = get_id()
    266         result = key, result, None, id

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
    243     elif istask(arg):
    244         func, args = arg[0], arg[1:]
--> 245         args2 = [_execute_task(a, cache) for a in args]
    246         return func(*args2)
    247     elif not ishashable(arg):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in <listcomp>(.0)
    243     elif istask(arg):
    244         func, args = arg[0], arg[1:]
--> 245         args2 = [_execute_task(a, cache) for a in args]
    246         return func(*args2)
    247     elif not ishashable(arg):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
    244         func, args = arg[0], arg[1:]
    245         args2 = [_execute_task(a, cache) for a in args]
--> 246         return func(*args2)
    247     elif not ishashable(arg):
    248         return arg

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in extend(self, df)
    217         # Store columns
    218         for col in df.columns:
--> 219             pack_file(df[col].values, self.dirname(partition_name, col))
    220 
    221         # Store index

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in pack_file(x, fn, encoding)
    390     if x.dtype != 'O':
    391         bloscpack.pack_ndarray_file(x, fn, bloscpack_args=bp_args,
--> 392                 blosc_args=blosc_args(x.dtype))
    393     else:
    394         bytes = blosc.compress(msgpack.packb(x.tolist(), encoding=encoding), 1)

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in blosc_args(dt)
     28 
     29 def blosc_args(dt):
---> 30     if np.issubdtype(dt, int):
     31         return bloscpack.BloscArgs(dt.itemsize, clevel=3, shuffle=True)
     32     if np.issubdtype(dt, np.datetime64):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/numpy/core/numerictypes.py in issubdtype(arg1, arg2)
    759     else:
    760         val = mro[0]
--> 761     return issubclass(dtype(arg1).type, val)
    762 
    763 

TypeError: data type not understood