pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.16k stars 1.94k forks source link

read_parquet and scan_parquet from S3 lead to exceptions or hanging #11372

Closed braaannigan closed 1 year ago

braaannigan commented 1 year ago

Checks

Reproducible example

import polars as pl
s3path = "s3://braaannigan/bigtest.pq"
# This works when no wildcard in filepath
pl.read_parquet(s3path)
# ComputeError: Generic S3 error: Missing region
pl.scan_parquet(s3path)
# This normally hangs and takes a while to keyboard interrupt
pl.scan_parquet(s3path,storage_options={"region":"us-east-2"})

Log output

ComputeError                              Traceback (most recent call last)
Cell In[9], line 1
----> 1 pl.scan_parquet(s3path)

File lib/python3.10/site-packages/polars/io/parquet/functions.py:254, in scan_parquet(source, n_rows, cache, parallel, rechunk, row_count_name, row_count_offset, storage_options, low_memory, use_statistics, hive_partitioning)
    251 if isinstance(source, (str, Path)):
    252     source = normalize_filepath(source)
--> 254 return pl.LazyFrame._scan_parquet(
    255     source,
    256     n_rows=n_rows,
    257     cache=cache,
    258     parallel=parallel,
    259     rechunk=rechunk,
    260     row_count_name=row_count_name,
    261     row_count_offset=row_count_offset,
    262     storage_options=storage_options,
    263     low_memory=low_memory,
    264     use_statistics=use_statistics,
    265     hive_partitioning=hive_partitioning,
    266 )

File lib/python3.10/site-packages/polars/lazyframe/frame.py:426, in LazyFrame._scan_parquet(cls, source, n_rows, cache, parallel, rechunk, row_count_name, row_count_offset, storage_options, low_memory, use_statistics, hive_partitioning)
    423     storage_options = list(storage_options.items())  #  type: ignore[assignment]
    425 self = cls.__new__(cls)
--> 426 self._ldf = PyLazyFrame.new_from_parquet(
    427     source,
    428     n_rows,
    429     cache,
    430     parallel,
    431     rechunk,
    432     _prepare_row_count_args(row_count_name, row_count_offset),
    433     low_memory,
    434     cloud_options=storage_options,
    435     use_statistics=use_statistics,
    436     hive_partitioning=hive_partitioning,
    437 )
    438 return self

ComputeError: Generic S3 error: Missing region

Issue description

Don't think my AWS settings are causing this, but I'm trying to understand what pl.read_parquet and pl.scan_parquet do differently.

My profile default AWS region is eu-west-2 whereas the file is in us-east-2. Interesting the read_parquet doesn't mind this but scan_parquet needs it to be specified.

Expected behavior

A LazyFrame for the file path

Installed versions

``` Polars: 0.19.5 Index type: UInt32 Platform: macOS-12.6.7-x86_64-i386-64bit Python: 3.11.3 (v3.11.3:f3909b8bc8, Apr 4 2023, 20:12:10) [Clang 13.0.0 (clang-1300.0.29.30)] ----Optional dependencies---- adbc_driver_sqlite: cloudpickle: connectorx: 0.3.2_alpha.2 deltalake: fsspec: gevent: matplotlib: 3.7.1 numpy: 1.24.3 openpyxl: pandas: 1.5.3 pyarrow: 12.0.0 pydantic: pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: 0.8.1 xlsxwriter: ```
zippeurfou commented 1 year ago

I have similars issues.. Add a few more context for my case.

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()
bucket = "grubhub-gdp-ods-data-assets-prod"
path = "path/file.parquet"
dataset = pq.ParquetDataset(f"s3://{bucket}/{path}", filesystem=fs)
df_one = pl.from_arrow(dataset.read())

This works as expected.

ds1 = pl.read_parquet(f"s3://{bucket}/{path}") # works as expected
ds2 = pl.scan_parquet(f"s3://{bucket}/{path}") # ComputeError: Generic S3 error: Missing region

Now if I try something else

scan_path = "path/*.parquet"
ds1_scan = pl.read_parquet(f"s3://{bucket}/{scan_path}") # This actually get me the first file somehow but does not scan the others
ds2_scan = pl.scan_parquet(f"s3://{bucket}/{scan_path}") # ComputeError: Generic S3 error: Missing region

Alternatively @braaannigan what you can do is this:

import s3fs
import pandas
import polars as pl
import pyarrow.dataset as ds

fs = s3fs.S3FileSystem()
path_dir = "path_dir"
s3_endpoint = f"s3://{bucket}/{path_dir}" 

myds = ds.dataset([y for y in fs.ls(s3_endpoint) if ".parquet" in y], filesystem=fs)
lazy_df = pl.scan_pyarrow_dataset(myds)
df = lazy_df.select(pl.col('*')).collect()

Edit: specifying the region ds1=pl.scan_parquet(s3path,storage_options={"region":"us-east-2"}) worked for me. Using: print(ds1.select(pl.col('*')).collect()) One thing I did notice is that this is pretty slow. I would have expected to be slow at the first time because of the io but for example if I do:

print(ds1.select(pl.col('*')).collect())
# Then
print(ds1.select(pl.col('*').limit(100).collect())
# The second iteration is still very slow.

My problem with that is that I wanted to try the new hive feature and I don't know how to do it in this way :(

ritchie46 commented 1 year ago

We only exposed it on scan_parquet yet. Missing region is an error that should indicate what to define though. ;) The region argument is required.

One thing I did notice is that this is pretty slow.

We are loading from the cloud. How does it perform relatively? That's something that's more useful.

ritchie46 commented 1 year ago

@braaannigan read_parquet still uses the old fsspec utility. For the new native solution we need region to be defined (for now).

This works for me:

q1 = pl.scan_parquet("s3://nyc-tlc/trip data/fhv_tripdata_2015-01.parquet", storage_options={"region": 'us-east-1'})
q1.head().collect()
braaannigan commented 1 year ago

@braaannigan read_parquet still uses the old fsspec utility. For the new native solution we need region to be defined (for now).

This works for me:

q1 = pl.scan_parquet("s3://nyc-tlc/trip data/fhv_tripdata_2015-01.parquet", storage_options={"region": 'us-east-1'})
q1.head().collect()

This doesn't work for me. Would you mind sharing your show_versions output?

The error I get is "ComputeError: Generic S3 error: response error "request error", after 10 retries: error sending request for url (http://169.254.169.254/latest/api/token): error trying to connect: tcp connect error: Host is down (os error 64)"

braaannigan commented 1 year ago

@ritchie46 Wait, this was my problem. I've added in the aws access key and secret key explicitly and the query works. Not sure why they weren't picked up from my default profile though.

Shall we close this?

I'm going to work out the docstrings a bit

braaannigan commented 1 year ago

@ritchie46 Where does scan_parquet infer the default AWS profile credentials/config from?

ritchie46 commented 1 year ago

This is what happens in object_store.

We can make things more user friendly with all this input. I believe we can auto-infer the region from the bucket and lower the amount of retry's so you get an error sooner.

ritchie46 commented 1 year ago

@braaannigan does exporting your secret as env vars not work?

braaannigan commented 1 year ago

@braaannigan does exporting your secret as env vars not work?

It does but obviously it's not good practice from a security point

zippeurfou commented 1 year ago

Thanks for the answer.

How does it perform relatively? That's something that's more useful.

I can do some quick benchmark in a few hours. Do you want me to open a different issue or just here is fine? For context, I was under the hypothesis that the first "scan" would have been slow but the second one on the same ds fast as I was somehow thinking some caching would happen after the first collect call in my previous example. Maybe this is not accurate.

igoichuk commented 1 year ago

Experiencing the same problem. Rolling back to 0.19.2 helped.

braaannigan commented 1 year ago

Experiencing the same problem. Rolling back to 0.19.2 helped.

Which problem have you had? Have you tried specifying keys in the storage_options dict?

igoichuk commented 1 year ago

I experience Missing region error if no region specified. It hangs if region is specified. Version 0.19.5.

I do not need to specify region in storage_options (or anywhere) in version 0.19.2. I do specify profile value in storage_options when needed.

braaannigan commented 1 year ago

I experience Missing region error if no region specified. It hangs if region is specified. Version 0.19.5.

I do not need to specify region in storage_options (or anywhere) in version 0.19.2. I do specify profile value in storage_options when needed.

AFAIK with the changes to the backend you need to specify region now. This may change in future releases

zippeurfou commented 1 year ago

I can do some quick benchmark in a few hours.

OK so I did some benchmark and this is not significantly slower for small dataset. The issues arise when you have lot of files in a hive format. The scan_parquet is just very slow. This might come from me though and my (lack of) understanding on how to create the path. my file format is as follow: s3://<bucket>/<database_name>/<table_name>/<partition_name>=<partition_value>/batch_id=<batch_id_val>/<partition>.parquet So I have tried to do: s3://<bucket>/<database_name>/<table_name>/ as my path but this errors with: Object at location <database_name>/<table_name> not found: response error "No Body", after 0 retries: HTTP status client error (404 Not Found) for url (https://s3.<region>.amazonaws.com/<bucket>/<database_name>/<table_name>) Alternatively I have done: s3://<bucket>/<database_name>/<table_name>/**/**/*.parquet And this works but is really slow because there are lot of files. What is the correct format @ritchie46 ?

marcosmoova commented 1 year ago

The credentials should be picked from ~.aws/credentials file Usually if they are as environment variables these take precedence. If they are not present then the default aws is to pick them from the .aws/credentials file. I assume that is why read_parquet works and scan_parquet does not work in my case that I don’t have the environment variables set.

erikcw commented 1 year ago

I'm getting a similar error since upgrading to polars 0.19.6 today.

In my case I'm use an AWS Role to provide authentication:

# env | grep AWS
AWS_DEFAULT_REGION=us-east-1
AWS_REGION=us-east-1
AWS_ROLE_ARN=arn:aws:iam::17************:role/eksctl-jde-cluster-addon-iamserviceaccount-l-Role1-*************
AWS_WEB_IDENTITY_TOKEN_FILE=/var/run/secrets/eks.amazonaws.com/serviceaccount/token
AWS_STS_REGIONAL_ENDPOINTS=regional
  File "/opt/app/utils.py", line 83, in already_processed
    df = pl.scan_parquet(s3_path)
         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app/.venv/lib/python3.11/site-packages/polars/io/parquet/functions.py", line 258, in scan_parquet
    return pl.LazyFrame._scan_parquet(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app/.venv/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 427, in _scan_parquet
    self._ldf = PyLazyFrame.new_from_parquet(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
exceptions.ComputeError: Generic S3 error: response error "request error", after 0 retries: error sending request for url (https://sts.us-east-1.amazonaws.com/?Action=AssumeRoleWithWebIdentity&DurationSeconds=3600&RoleArn=arn%3Aaws%3Aiam%3A%3A17691******2%3Arole%2Feksctl-jde-cluster-addon-iamserviceaccount-l-Role1-***********3&RoleSessionName=WebIdentitySession&Version=2011-06-15&WebIdentityToken=**********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************): error trying to connect: invalid peer certificate: BadSignature

Rolled back to 0.19.2 and it started working again.

ritchie46 commented 1 year ago

I don't think we support that; You must either set the proper env vars or pass them explicitly.

ritchie46 commented 1 year ago

The issues arise when you have lot of files in a hive format. The scan_parquet is just very slow.

I tried to mitigate this in 01.9.6. Can you try again?

zippeurfou commented 1 year ago

The issues arise when you have lot of files in a hive format. The scan_parquet is just very slow.

I tried to mitigate this in 01.9.6. Can you try again?

Yes I confirm it is way faster in term of the scan operation. What I did notice is that once I do collect then it takes a while to use all the core. This might be that finding the partition take a while?

Screenshot 2023-10-04 at 9 57 42 AM

For context this is the code I am using:

s3_path = "s3://<bucket>/<db>/<table_name>/**/**/*.parquet"
# format "s3://<bucket>/<db>/<table_name>/start_date=<YYY-MM-DD>/batch_id=<number>/<file>.snappy.parquet"
ds1 = pl.scan_parquet(s3_path,storage_options={"region":"us-east-1"},hive_partitioning=True) #way faster now :)
query = """
SELECT
            my_col
FROM frame
WHERE start_date = '2023-04-01'
LIMIT 100
"""
res = pl.SQLContext(frame=ds1).execute(
   query
).collect() #somehow use single cpu for a while and is still pretty slow.

start_date is my partition column. Sadly I can't provide a reproducible example right now.

ritchie46 commented 1 year ago

This has been fixed now.