dask / fastparquet

python implementation of the parquet columnar file format.
Apache License 2.0
770 stars 176 forks source link

errors from read_parquet with engine param as auto or fastparquet #143

Closed data-steve closed 7 years ago

data-steve commented 7 years ago

After closing out issue #142, I updated pyarrow, fastparquet, and dask from conda-forge: conda install fastparquet pyarrow dask -c conda-forge

I refresh my IDE's python interpreter and did some pyarrow stuff on just one file and a whole glob of files. All that works great.

from dask.dataframe import read_parquet

filelist = glob(data_path+"/*parquet")
filelist

df = read_parquet(filelist[0], engine='arrow')
df.compute()

df = read_parquet(filelist, engine='arrow')
df.compute()

With arrow as the engine I can read the whole file glob or just one file, even without the metafiles, which I had to generate before to get fastparquet to work: writer.merge(filelist[1:]). I think pyarrow auto-generates itself the metadata off the first parquet file in the glob, as of a couple months ago.

When I switch the engine to auto and try to use the same syntax as with arrow as engine, I get type errors problems because of how to specify the path.

df = read_parquet(filelist, engine='auto')

TypeErrorTraceback (most recent call last)
<ipython-input-18-0efd07ac93b5> in <module>()
----> 1 df = read_parquet(filelist, engine='auto')

/Users/steve/anaconda/lib/python2.7/site-packages/dask/dataframe/io/parquet.pyc in read_parquet(path, columns, filters, categories, index, storage_options, engine)
    284         return _read_fastparquet(fs, paths, file_opener, columns=columns,
    285                                  filters=filters,
--> 286                                  categories=categories, index=index)
    287     else:
    288         return _read_pyarrow(fs, paths, file_opener, columns=columns,

/Users/steve/anaconda/lib/python2.7/site-packages/dask/dataframe/io/parquet.pyc in _read_fastparquet(fs, paths, myopen, columns, filters, categories, index, storage_options)
     49 
     50     if len(paths) > 1:
---> 51         pf = fastparquet.ParquetFile(paths, open_with=myopen, sep=myopen.fs.sep)
     52     else:
     53         try:

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in __init__(self, fn, verify, open_with, sep)
     43                  sep=os.sep):
     44         try:
---> 45             fn2 = sep.join([fn, '_metadata'])
     46             self.fn = fn2
     47             with open_with(fn2, 'rb') as f:

TypeError: sequence item 0: expected string, list found

If I pass just one file to it, I get a new error, which showed up in #127 also.

df = read_parquet(filelist[0], engine='auto')  

RuntimeErrorTraceback (most recent call last)
<ipython-input-17-1be3b36ee8cd> in <module>()
----> 1 df = read_parquet(data_path+"/", engine='fastparquet')

/Users/steve/anaconda/lib/python2.7/site-packages/dask/dataframe/io/parquet.pyc in read_parquet(path, columns, filters, categories, index, storage_options, engine)
    284         return _read_fastparquet(fs, paths, file_opener, columns=columns,
    285                                  filters=filters,
--> 286                                  categories=categories, index=index)
    287     else:
    288         return _read_pyarrow(fs, paths, file_opener, columns=columns,

/Users/steve/anaconda/lib/python2.7/site-packages/dask/dataframe/io/parquet.pyc in _read_fastparquet(fs, paths, myopen, columns, filters, categories, index, storage_options)
     66 
     67     # Find an index among the partially sorted columns
---> 68     minmax = fastparquet.api.sorted_partitioned_columns(pf)
     69 
     70     if index is False:

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in sorted_partitioned_columns(pf)
    506     statistics
    507     """
--> 508     s = statistics(pf)
    509     columns = pf.columns
    510     out = dict()

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in statistics(obj)
    468 
    469     if isinstance(obj, ParquetFile):
--> 470         L = list(map(statistics, obj.row_groups))
    471         names = obj.columns
    472         d = {n: {col: [item[col].get(n, None) for item in L]

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in statistics(obj)
    465     if isinstance(obj, parquet_thrift.RowGroup):
    466         return {'.'.join(c.meta_data.path_in_schema): statistics(c)
--> 467                 for c in obj.columns}
    468 
    469     if isinstance(obj, ParquetFile):

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in <dictcomp>((c,))
    465     if isinstance(obj, parquet_thrift.RowGroup):
    466         return {'.'.join(c.meta_data.path_in_schema): statistics(c)
--> 467                 for c in obj.columns}
    468 
    469     if isinstance(obj, ParquetFile):

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in statistics(obj)
    454             return rv
    455         if s.max is not None:
--> 456             rv['max'] = encoding.read_plain(ensure_bytes(s.max), md.type, 1)[0]
    457         if s.min is not None:
    458             rv['min'] = encoding.read_plain(ensure_bytes(s.min), md.type, 1)[0]

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/encoding.pyc in read_plain(raw_bytes, type_, count, width)
     45         return read_plain_boolean(raw_bytes, count)
     46     # variable byte arrays (rare)
---> 47     return np.array(unpack_byte_array(raw_bytes, count), dtype='O')
     48 
     49 

/Users/steve/anaconda/lib/python2.7/site-packages/fastparquet/speedups.pyx in fastparquet.speedups.unpack_byte_array (fastparquet/speedups.c:3037)()
    164 
    165     if remaining != 0:
--> 166         raise RuntimeError("invalid input size (corrupted?)")
    167     return out

RuntimeError: invalid input size (corrupted?)

I get the same errors as I do with 'auto' when I use 'fastparquet' which leads me to assume 'auto' means 'fastparquet' at the moment, which the docs confirm

I also tried changing how I declare where to look for the data df = read_parquet(data_path+"/", engine='fastparquet') because of what I read in issues like these #137.

Aside from the errors I'm getting, from a user-experience perspective (which is all I'm qualified to give), if dask.read_parquet is going to allow 2 engines, it'd be nice to have them both work using the same top-level API in terms of what gets fed into path param. Otherwise, maybe 'arrow' needs to be the default setting for 'auto' since it just works on a single file or files and not needing the extra steps of using the writer.merge(filelist[1:]) when there is no metadata.

martindurant commented 7 years ago

I am in the process of releasing fastparquet today. Would you mind trying your code with the laster master version of fastparquet ?

data-steve commented 7 years ago

with a pip install git+https://github.com/dask/fastparquet/ here my output from same script above. It works. In this one I've included the _metadata files in the folder.

screen shot 2017-05-04 at 12 14 30 pm

Weirdly though, it seems faster by half when I don't include the metadata files.

screen shot 2017-05-04 at 12 17 31 pm

thanks

martindurant commented 7 years ago

OK, good to know it works. I would probably ascribe the faster running in your second round to caching, but it would be good to find that out for sure.

In general, a performance of fastparquet versus arrow will depend on a number of factors, such as the type of data stored and which framework wrote it.

data-steve commented 7 years ago

Here's the times for them with on metadata . Each round they are in different order and an interpreter restart between each one. The dataframe is from 5 snappy.parquet files written out by pyspark.parquet() and results in a shape (126722, 16) of dtypes mostly object with some int32.

screen shot 2017-05-04 at 12 26 08 pm

screen shot 2017-05-04 at 12 26 47 pm

screen shot 2017-05-04 at 12 24 32 pm

martindurant commented 7 years ago

Note that there is a startup cost when calling fastparquet for the first time to compile the numba-decorated functions. You might also be interested in https://github.com/andrix/python-snappy/pull/38 , which will improve performance for you if you were running the above with threads as opposed to processes. Also, I invite you to write the data out with dask/fastparquet and then time the reading - you will likely find a great improvement, especially if you convert as many object columns as appropriate to categorical.

data-steve commented 7 years ago

Thanks those are good tips. Typical workflow our boss wants is filtering/munging/joins in Spark and then use python later bec it's asssumed to be faster on databases dumps of 10s of millions of rows each. So that's why the parquet are the way they are.

I'm trying as much as possible to move everything to python. And dask is a great option for this. I just have to find time to solve all the logistics that my team already solved w Spark in the pipeline. I'd really love a way to see this all end to end dask.

~ Steve

Sent via telepathy

On May 4, 2017, at 12:47 PM, Martin Durant notifications@github.com wrote:

Note that there is a startup cost when calling fastparquet for the first time to compile the numba-decorated functions. You might also be interested in andrix/python-snappy#38 , which will improve performance for you if you were running the above with threads as opposed to processes. Also, I invite you to write the data out with dask/fastparquet and then time the reading - you will likely find a great improvement, especially if you convert as many object columns as appropriate to categorical.

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub, or mute the thread.