dask / dask-expr

BSD 3-Clause "New" or "Revised" License
79 stars 18 forks source link

Read Parquet exceptions thrown with PyArrow S3FileSystem #1005

Open adampinky85 opened 3 months ago

adampinky85 commented 3 months ago

Describe the issue:

Hi team,

We are intensive users of Dask and it's a great product!

We use Apache Arrow's pyarrow.fs.S3FileSystem in our ecosystem rather than s3fs.S3FileSystem due to performance and deadlock issues that were found during multiprocessing.

We're able to retrieve single files and entire directories of many files successfully. But using either a glob path or a interable of paths the read_parquet API throws exceptions with maximum recursion depth.

If would really helpful if the team can investigate? Many thanks!

Minimal Verifiable Example:

import dask.dataframe
import pyarrow.fs

# pyarrow s3 file system client
arrow_s3 = pyarrow.fs.S3FileSystem(
    access_key=s3_tokens["AccessKeyId"],
    session_token=s3_tokens["SessionToken"],
    secret_key=s3_tokens["SecretAccessKey"],
    region=_REGION,
    scheme="http",
)

# works as expected for both target_1 and target_2
df = dask.dataframe.read_parquet(
    path=f"s3://{bucket}/{key}/target_1.parquet",
    filesystem=arrow_s3,
)

# works as expected for entire folder, numerous parquet files
df = dask.dataframe.read_parquet(
    path=f"s3://{bucket}/{key}",
    filesystem=arrow_s3,
)
df

# glob path raises exception
df = dask.dataframe.read_parquet(
    path=f"s3://{bucket}/{key}/*.parquet",
    filesystem=arrow_s3,
)

# iterable of paths, either tuple or list, raises exception
df = dask.dataframe.read_parquet(
    path=[
        f"s3://{bucket}/{key}/target_1.parquet",
        f"s3://{bucket}/{key}/target_2.parquet",
    ],    
    filesystem=arrow_s3,
)

Exception:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File /env/jupyter/venv/lib/python3.12/site-packages/dask_expr/_core.py:446, in Expr.__getattr__(self, key)
    445 try:
--> 446     return object.__getattribute__(self, key)
    447 except AttributeError as err:

File /usr/local/lib/python3.12/functools.py:995, in cached_property.__get__(self, instance, owner)
    994 if val is _NOT_FOUND:
--> 995     val = self.func(instance)
    996     try:

File /env/jupyter/venv/lib/python3.12/site-packages/dask_expr/io/parquet.py:716, in ReadParquetPyarrowFS.normalized_path(self)
    714 @cached_property
    715 def normalized_path(self):
--> 716     return _normalize_and_strip_protocol(self.path)

File /env/jupyter/venv/lib/python3.12/site-packages/dask_expr/io/parquet.py:1658, in _normalize_and_strip_protocol(path)
   1657 for sep in protocol_separators:
-> 1658     split = path.split(sep, 1)
   1659     if len(split) > 1:

AttributeError: 'list' object has no attribute 'split'

During handling of the above exception, another exception occurred:

...

RecursionError: maximum recursion depth exceeded while calling a Python object
Normalization failed: type=AttributeError args=<unknown>

Environment: OS: Amazon Linux release 2 (Karoo) Linux: 4.14.336-257.566.amzn2.x86_64 Python: 3.12.2

Packages: arrow: 1.3.0 dask: 2024.3.1 dask-expr: 1.0.4 numpy: 1.26.4 pandas: 2.2.1 pyarrow: 15.0.2 pyarrow-hotfix: 0.6 Install method pip

phofl commented 3 months ago

Hi, thanks for your report. We will look into those (already put up a PR to fix the list case).

For context: The Arrow FS now leverages a rewrite of the parquet implementation that's a lot faster than the legacy implementation (a few rough edges are still expected unfortunately)

adampinky85 commented 3 months ago

Great thanks! I believe glob is not supported in Arrrow’s s3 FS - that may be the issue for the glob case. The list case fix is much appreciated 😀

fjetter commented 3 months ago

Yeah, glob patterns are not supported by Arrow FS. I don't have a solution to this yet but at the same time I'm not entirely convinced this is even necessary.

For example, instead of s3://{bucket}/{key}/*.parquet you should be able to use s3://{bucket}/{key}. Accepting a list of files should be fine, though.

adampinky85 commented 3 months ago

Thanks, yes glob pattern example was a trivial to show the exception. In our real use cases, it is useful to load only the targeted subset of files. e.g,, load all files for a specific year, month, and various identifiers. But the list path case fix is great and resolves the main issue.