apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.28k stars 3.47k forks source link

[Python] Reading Hive-style partitioned parquet files from GCS #30481

Open asfimport opened 2 years ago

asfimport commented 2 years ago

Trying to read a spark-generated hive-style partitioned parquet dataset with gcsfs and {}pyarrow{}, but getting a FileNotFoundError if I try to read from the base directory or even if try to read directly from one of the partitions. Not sure if I am doing something wrong or it is not supported.

Note that I have successfully read this hive-style partitioned parquet dataset using other methods to rule out any other issues, including:

Reporter: Garrett Weaver

Note: This issue was originally created as ARROW-14959. Please see the migration documentation for further details.

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: Just to confirm, if you read a specific file then it works? For example with:


import pyarrow.parquet as pq
pq.read_table("path/to/partitioned/dataset/base/dir/partition_var=some_value/<data file>.parquet", filesystem=gcs)

Can you try some things with the filesystem object and show what those return?


gcs.isdir("path/to/partitioned/dataset/base/dir/")
gcs.exists("path/to/partitioned/dataset/base/dir/")
gcs.info("path/to/partitioned/dataset/base/dir/")
gcs.find("path/to/partitioned/dataset/base/dir/", maxdepth=None, withdirs=True, detail=True)
mmwinther commented 1 year ago

@jorisvandenbossche We experience this same bug with PyArrow v11. Tested that the same partitioned directory works with:

Code example

import gcsfs
import pyarrow as pa

gcs = gcsfs.GCSFileSystem()
parquet_ds = pq.ParquetDataset("gs://<redacted-bucket-name>/partition/dir", filesystem=gcs)

Traceback

<venv>/lib/python3.10/site-packages/pyarrow/parquet/core.py:1763: in __new__
    return _ParquetDatasetV2(
<venv>/lib/python3.10/site-packages/pyarrow/parquet/core.py:2477: in __init__
    self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
<venv>/lib/python3.10/site-packages/pyarrow/dataset.py:762: in dataset
    return _filesystem_dataset(source, **kwargs)
<venv>/lib/python3.10/site-packages/pyarrow/dataset.py:453: in _filesystem_dataset
    factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)
pyarrow/_dataset.pyx:2236: in pyarrow._dataset.FileSystemDatasetFactory.__init__
    ???
pyarrow/error.pxi:144: in pyarrow.lib.pyarrow_internal_check_status
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   pyarrow.lib.ArrowInvalid: GetFileInfo() yielded path '<redacted-bucket-name>/partition/dir/part-00001-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet', which is outside base dir 'gs://<redacted-bucket-name>/partition/dir'

pyarrow/error.pxi:100: ArrowInvalid

It appears to be simply caused by removal of the gs:// prefix and proceeding path mismatch. I confirmed that the same dataset is successfully read when omitting the gs:// prefix.

Details of filesystem object

gcs.isdir(partition_base_dir_path)
>> True
gcs.exists(partition_base_dir_path)
>> True
gcs.info(partition_base_dir_path)
>> {'bucket': '<redacted-bucket-name>',
    'name': '<redacted-bucket-name>/partition/dir',
    'size': 0,
    'storageClass': 'DIRECTORY',
    'type': 'directory'}
gcs.find(partition_base_dir_path, maxdepth=None, withdirs=True, detail=True)
{'<redacted-bucket-name>/partition/dir': {'Key': 'partition/dir',
  'Size': 0,
  'name': '<redacted-bucket-name>/partition/dir',
  'StorageClass': 'DIRECTORY',
  'type': 'directory',
  'size': 0},
 '<redacted-bucket-name>/partition/dir/': {'kind': 'storage#object',
  'id': '<redacted-bucket-name>/partition/dir//1645614930653539',
  'selfLink': 'https://www.googleapis.com/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2F',
  'mediaLink': 'https://storage.googleapis.com/download/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2F?generation=1645614930653539&alt=media',
  'name': '<redacted-bucket-name>/partition/dir/',
  'bucket': '<redacted-bucket-name>',
  'generation': '1645614930653539',
  'metageneration': '1',
  'contentType': 'application/octet-stream',
  'storageClass': 'STANDARD',
  'size': 0,
  'md5Hash': '1B2M2Y8AsgTpgAmY7PhCfg==',
  'crc32c': 'AAAAAA==',
  'etag': 'COPig6vZlfYCEAE=',
  'timeCreated': '2022-02-23T11:15:30.656Z',
  'updated': '2022-02-23T11:15:30.656Z',
  'timeStorageClassUpdated': '2022-02-23T11:15:30.656Z',
  'type': 'file'},
 '<redacted-bucket-name>/partition/dir/_SUCCESS': {'kind': 'storage#object',
  'id': '<redacted-bucket-name>/partition/dir/_SUCCESS/1645614930849546',
  'selfLink': 'https://www.googleapis.com/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2F_SUCCESS',
  'mediaLink': 'https://storage.googleapis.com/download/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2F_SUCCESS?generation=1645614930849546&alt=media',
  'name': '<redacted-bucket-name>/partition/dir/_SUCCESS',
  'bucket': '<redacted-bucket-name>',
  'generation': '1645614930849546',
  'metageneration': '1',
  'contentType': 'application/octet-stream',
  'storageClass': 'STANDARD',
  'size': 0,
  'md5Hash': '1B2M2Y8AsgTpgAmY7PhCfg==',
  'crc32c': 'AAAAAA==',
  'etag': 'CIrej6vZlfYCEAE=',
  'timeCreated': '2022-02-23T11:15:30.851Z',
  'updated': '2022-02-23T11:15:30.851Z',
  'timeStorageClassUpdated': '2022-02-23T11:15:30.851Z',
  'type': 'file'},
 '<redacted-bucket-name>/partition/dir/part-00000-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet': {'kind': 'storage#object',
  'id': '<redacted-bucket-name>/partition/dir/part-00000-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet/1645614930433213',
  'selfLink': 'https://www.googleapis.com/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2Fpart-00000-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet',
  'mediaLink': 'https://storage.googleapis.com/download/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2Fpart-00000-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet?generation=1645614930433213&alt=media',
  'name': '<redacted-bucket-name>/partition/dir/part-00000-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet',
  'bucket': '<redacted-bucket-name>',
  'generation': '1645614930433213',
  'metageneration': '1',
  'contentType': 'application/octet-stream',
  'storageClass': 'STANDARD',
  'size': 993,
  'md5Hash': 'U/QwH2SM2un91crFW2lqWA==',
  'crc32c': '1iQeEw==',
  'etag': 'CL2p9qrZlfYCEAE=',
  'timeCreated': '2022-02-23T11:15:30.435Z',
  'updated': '2022-02-23T11:15:30.435Z',
  'timeStorageClassUpdated': '2022-02-23T11:15:30.435Z',
  'type': 'file'},
 '<redacted-bucket-name>/partition/dir/part-00001-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet': {'kind': 'storage#object',
  'id': '<redacted-bucket-name>/partition/dir/part-00001-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet/1645614930179084',
  'selfLink': 'https://www.googleapis.com/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2Fpart-00001-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet',
  'mediaLink': 'https://storage.googleapis.com/download/storage/v1/b/<redacted-bucket-name>/o/partition%2Fdir%2Fpart-00001-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet?generation=1645614930179084&alt=media',
  'name': '<redacted-bucket-name>/partition/dir/part-00001-d2924cb4-ae44-4401-8bfd-b67a4c9ab9ce-c000.snappy.parquet',
  'bucket': '<redacted-bucket-name>',
  'generation': '1645614930179084',
  'metageneration': '1',
  'contentType': 'application/octet-stream',
  'storageClass': 'STANDARD',
  'size': 993,
  'md5Hash': 'b5UF9wxWOK9lzAl9HnSMjw==',
  'crc32c': '1Es0QA==',
  'etag': 'CIzo5qrZlfYCEAE=',
  'timeCreated': '2022-02-23T11:15:30.188Z',
  'updated': '2022-02-23T11:15:30.188Z',
  'timeStorageClassUpdated': '2022-02-23T11:15:30.188Z',
  'type': 'file'}}
salman1993 commented 1 year ago

We are also facing a similar issue. We have a Hive-style partitioned parquet dataset written with Spark. We cannot load it up with pyarrow (using gcsfs as the filesystem). Getting a FileNotFoundError when we run:

pq_ds = pq.ParquetDataset(
    path,
    filesystem=gcsfs.GCSFileSystem(),
    pre_buffer=False,
    use_legacy_dataset=False,
    partitioning="hive",
)

Error:

Traceback (most recent call last):
  File "/Users/smohammed/Development/playground/read_parquet/benchmark_pyarrow_gcs.py", line 35, in <module>
    pq_ds = pq.ParquetDataset(
  File "/Users/smohammed/.pyenv/versions/def/lib/python3.9/site-packages/pyarrow/parquet/__init__.py", line 1663, in __new__
    return _ParquetDatasetV2(
  File "/Users/smohammed/.pyenv/versions/def/lib/python3.9/site-packages/pyarrow/parquet/__init__.py", line 2351, in __init__
    self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
  File "/Users/smohammed/.pyenv/versions/def/lib/python3.9/site-packages/pyarrow/dataset.py", line 694, in dataset
    return _filesystem_dataset(source, **kwargs)
  File "/Users/smohammed/.pyenv/versions/def/lib/python3.9/site-packages/pyarrow/dataset.py", line 449, in _filesystem_dataset
    return factory.finish(schema)
  File "pyarrow/_dataset.pyx", line 1857, in pyarrow._dataset.DatasetFactory.finish
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/_fs.pyx", line 1190, in pyarrow._fs._cb_open_input_file
  File "/Users/smohammed/.pyenv/versions/def/lib/python3.9/site-packages/pyarrow/fs.py", line 400, in open_input_file
    raise FileNotFoundError(path)
FileNotFoundError: <redacted>/benchmark_pq_data/small/

Can also confirm that the files do exist in GCS and we can load up individual files using pq.read_table(...)

ahringer commented 8 months ago

Having same issue in Azure ML Studio reading spark DF created with single partition .../foo=1/. Changing ONLY engine to fastparquet results in no error: mydata = pd.read_parquet( f"abfs://container/path/", engine="fastparquet")

felipecrv commented 7 months ago

@felipecrv (tagging myself so I get this on my "Participating" inbox filter)

pascalwhoop commented 1 month ago

we're also running into this. More absurdly, it's failing on the first read but a retry works every time. So

for i in range(2):
    try: 
        read()
    except:
         pass

works... so bizarre

Also this only happens when we

  1. write with spark in partitioned folder
  2. read with pandas (both pyarrow and fastparquet actually)