dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.41k stars 1.7k forks source link

Filtering using read_parquet results in empty partitions and an error when computing df #4100

Closed geo64 closed 5 years ago

geo64 commented 5 years ago

Reading a parquet file with a filter results in an error: AttributeError: 'tuple' object has no attribute 'tail'. This error only occurs for Dask version >= 0.19.1 (This problem does not occur in 0.19.0).

The problem seems to be that, on applying a filter some of the partitions returned are empty which results in compute failing.

Fastparquet version: 0.1.6 Pandas version: 0.23.4 Python version: 3.6.4


import pandas as pd
import dask.dataframe as dd

ddf = dd.from_pandas(pd.DataFrame.from_dict({'a': [1,1,2,2, 3,3,4,4], 'b': range(0, 8)}), npartitions=2)

dd.to_parquet(ddf, 'FooBar.parquet',  partition_on = ['a'])

ddf_new = dd.read_parquet('FooBar.parquet', filters=[('a', '>' , 2)])
print(ddf_new.npartitions)
print('______')
print(ddf_new.get_partition(1).compute())
print('______')
print(ddf_new.compute())
martindurant commented 5 years ago

A quick test, and this does run fine for me on dask and fastparquet master, and filters=[('a', '>' , 3)] results in a 1-partition data-frame, as it should.

geo64 commented 5 years ago

Thanks for looking into it so quickly.

Running 0.19.4 i get the error below, i also got the same error when installing dask on master.


---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-4-5221b85ab5f6> in <module>
     11 print(ddf_new.get_partition(3).compute())
     12 print('______')
---> 13 print(ddf_new.compute())

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    390                              collections=collections)
    391 
--> 392     dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
    393     keys = [x.__dask_keys__() for x in collections]
    394     postcomputes = [x.__dask_postcompute__() for x in collections]

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/base.py in collections_to_dsk(collections, optimize_graph, **kwargs)
    192 
    193         dsk = merge(*(opt(dsk, keys, **kwargs)
--> 194                       for opt, (dsk, keys) in groups.items()))
    195     else:
    196         dsk, _ = _extract_graph_and_keys(collections)

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/base.py in <genexpr>(.0)
    192 
    193         dsk = merge(*(opt(dsk, keys, **kwargs)
--> 194                       for opt, (dsk, keys) in groups.items()))
    195     else:
    196         dsk, _ = _extract_graph_and_keys(collections)

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/dataframe/optimize.py in optimize(dsk, keys, **kwargs)
     14     from .io import dataframe_from_ctable
     15     if isinstance(keys, list):
---> 16         dsk, dependencies = cull(dsk, list(core.flatten(keys)))
     17     else:
     18         dsk, dependencies = cull(dsk, [keys])

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/optimization.py in cull(dsk, keys)
     44         out_keys += work
     45         deps = [(k, get_dependencies(dsk, k, as_list=True))  # fuse needs lists
---> 46                 for k in work]
     47         dependencies.update(deps)
     48         for _, deplist in deps:

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/optimization.py in <listcomp>(.0)
     44         out_keys += work
     45         deps = [(k, get_dependencies(dsk, k, as_list=True))  # fuse needs lists
---> 46                 for k in work]
     47         dependencies.update(deps)
     48         for _, deplist in deps:

~/.local/share/virtualenvs/insurance-SnJIoDKP/lib/python3.6/site-packages/dask/core.py in get_dependencies(dsk, key, task, as_list)
    203     """
    204     if key is not None:
--> 205         arg = dsk[key]
    206     elif task is not None:
    207         arg = task

KeyError: ('read-parquet-7b9eb5739bcdabeb8722949b8f9333e9', 3)
martindurant commented 5 years ago

Would you mind trying with fastparquet master? (I assume you are using fastparquet, right? I don't think filters= does anything at all for pyarrow)

geo64 commented 5 years ago

Yep, currently on fast parquet 0.1.6

To install fastparquet from master I tried 'python setup.py install' and got the following error:



running install
running bdist_egg
running egg_info
writing fastparquet.egg-info/PKG-INFO
writing dependency_links to fastparquet.egg-info/dependency_links.txt
writing requirements to fastparquet.egg-info/requires.txt
writing top-level names to fastparquet.egg-info/top_level.txt
reading manifest file 'fastparquet.egg-info/SOURCES.txt'
reading manifest template 'MANIFEST.in'
warning: no files found matching '*.c' under directory 'fastparquet'
no previously-included directories found matching 'docs/_build'
writing manifest file 'fastparquet.egg-info/SOURCES.txt'
installing library code to build/bdist.macosx-10.13-x86_64/egg
running install_lib
running build_py
warning: build_py: byte-compiling is disabled, skipping.

running build_ext
building 'fastparquet.speedups' extension
clang -Wno-unused-result -Wsign-compare -Wunreachable-code -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -I/Users/gearoid/.pyenv/versions/3.6.4/include/python3.6m -I/Users/gearoid/.pyenv/versions/3.6.4/lib/python3.6/site-packages/numpy/core/include -c fastparquet/speedups.c -o build/temp.macosx-10.13-x86_64-3.6/fastparquet/speedups.o
clang: error: no such file or directory: 'fastparquet/speedups.c'
clang: error: no input files
error: command 'clang' failed with exit status 1
martindurant commented 5 years ago

Right, I think you would need cython. I have added the .c file directly to the repo just now, so you can try again.

Perhaps your original code should be included in the dask parquet tests?

geo64 commented 5 years ago

Using fastparquet on master solved it.

I think it's worth having it (or something similar) as a test case, need me to do anything on productionising it?

martindurant commented 5 years ago

Mental note, then, to release fastparquet sometime.

mrocklin commented 5 years ago

I think it's worth having it (or something similar) as a test case, need me to do anything on productionising it?

@geo64 yes, a PR adding a test for this situation would be very welcome.

jakirkham commented 5 years ago

It sounds like this was resolved. Is that correct? If so, would someone like to close this one out? If not, please feel free to correct me 🙂