MrPowers / dask-interop

Integration tests to demonstrate Dask's interoperability with other systems
3 stars 2 forks source link

Spark written Parquet lakes can't be read with FastParquet #4

Open MrPowers opened 3 years ago

MrPowers commented 3 years ago

@martindurant - this test suite demonstrates how a Parquet lake written by Spark cannot be read with Dask using the FastParquet engine (it works with the PyArrow engine).

I'll see if I can figure out a fix ;)

martindurant commented 3 years ago

You mean test function test_pyspark_parquet_to_dask_fastparquet ?

MrPowers commented 3 years ago

@martindurant - yes, that's the one, sorry for not specifying. Here's how you can reproduce it on your machine:

test_pyspark_parquet_to_dask_fastparquet fails with this error message:

    def test_pyspark_parquet_to_dask_fastparquet():
>       actual_ddf = dd.read_parquet("resources/parquet/1", engine="fastparquet")

tests/test_parquet.py:16: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py:318: in read_parquet
    read_metadata_result = engine.read_metadata(
../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/dask/dataframe/io/parquet/fastparquet.py:758: in read_metadata
    parts, pf, gather_statistics, base_path = _determine_pf_parts(
../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/dask/dataframe/io/parquet/fastparquet.py:189: in _determine_pf_parts
    pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/fastparquet/api.py:105: in __init__
    basepath, fmd = metadata_from_many(fn, verify_schema=verify,
../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/fastparquet/util.py:151: in metadata_from_many
    pf0 = api.ParquetFile(f0, open_with=open_with)
../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/fastparquet/api.py:131: in __init__
    self._parse_header(f, verify)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'ParquetFile' object has no attribute '_schema'") raised in repr()] ParquetFile object at 0x16b2df9d0>
f = <_io.BufferedReader name='/Users/powers/Documents/code/my_apps/dask-interop/resources/parquet/1/._SUCCESS.crc'>
verify = False

    def _parse_header(self, f, verify=True):
        if self.fn and self.fn.endswith("_metadata"):
            #  no point attempting to read footer only for pure metadata
            data = f.read()[4:-8]
            self._head_size = len(data)
        else:
            try:
                f.seek(0)
                if verify:
                    assert f.read(4) == b'PAR1'
                f.seek(-8, 2)
                head_size = struct.unpack('<i', f.read(4))[0]
                if verify:
                    assert f.read() == b'PAR1'
                self._head_size = head_size
>               f.seek(-(head_size + 8), 2)
E               OSError: [Errno 22] Invalid argument

../../../../opt/miniconda3/envs/dask-interop/lib/python3.9/site-packages/fastparquet/api.py:179: OSError

My guess is that this _SUCCESS file is causing the issue.

Sorry for not giving you a proper bug report in the previous message.

martindurant commented 3 years ago

_SUCCESS file is causing the issue

Yes, I think that's right - so this is happening in dask, not fastparquet (I bet if you call pd.read_parquet, it works fine). You can, of course, test by removing _SUCCESS, which I think you did in the original issue.

MrPowers commented 3 years ago

@martindurant - Thanks for commenting and thanks for your patience. I'm a n00b.

Looks like this lets Dask read a Parquet lake written by PySpark with FastParquet:

actual_ddf = dd.read_parquet(
    os.path.join("resources/parquet/1", "*.parquet"), engine="fastparquet"
)

I think FastParquet might be having trouble because there isn't a metadata file. Can you provide any additional info you can provide me on the metadata file, like the data it contains and the file format? The metadata file has always been mysterious to me. Thanks for the help!