aws / aws-sdk-pandas

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
https://aws-sdk-pandas.readthedocs.io
Apache License 2.0
3.94k stars 701 forks source link

s3.to_parquet: Truncated Parquet file created in S3 after CreateMultipartUpload error #2987

Closed rdwebster closed 1 month ago

rdwebster commented 1 month ago

Describe the bug

We are using the s3.to_parquet function to populate data files in our S3 Data Lake. In heavy traffic situations, we started noticing errors about Invalid Parquet files when querying the Data Lake with Athena.

These cases line up closely with CreateMultipartUpload (SlowDown) errors from S3 when data is being written by s3.to_parquet. When we examine the indicated invalid Parquet files, they always appear to be a little over 5 MB in size.

The error from S3 which happens occasionally using s3.to_parquet: "An error occurred (SlowDown) when calling the CreateMultipartUpload operation (reached max retries: 5): Please reduce your request rate."

Then later, when querying the Data Lake data via Athena, we get errors like this: "HIVE_BAD_DATA: Not valid Parquet file: s3:///

//a046b1c2187049fc958042a759f40a54.snappy.parquet expected magic number: PAR1 got: 506f"

We need to remove these broken files from S3 in order to successfully query the Data Lake with Athena.

I think I see why the broken Parquet file is getting created...

CreateMultipartUpload is being called when flush() is called more then 5MB of data. https://github.com/aws/aws-sdk-pandas/blob/cc77561fd8a58e302b47bf787ed3643ab5550e87/awswrangler/s3/_fs.py#L410

As a result of this exception being raised, close() gets called by a finally block. https://github.com/aws/aws-sdk-pandas/blob/cc77561fd8a58e302b47bf787ed3643ab5550e87/awswrangler/s3/_fs.py#L599

In close(), the _parts_count value is still zero (since execution of flush() didn't get far enough to increment this). But, _buffer is not empty, so it executes the code to call S3 put_object on the data in the buffer (the first 5+MB of the Parquet file data). https://github.com/aws/aws-sdk-pandas/blob/cc77561fd8a58e302b47bf787ed3643ab5550e87/awswrangler/s3/_fs.py#L483

How to Reproduce

I was able to reproduce the issue by downloading the awswrangler code and modifying the flush() code in _fs.py to raise an exception just before where 'create_multipart_upload' would be invoked. Then, after calling s3.to_parquet on a large data frame there would be an exception but a new 5+MB file would appear in S3.

Expected behavior

No file should be created in S3 if the upload fails due to errors from S3.

Your project

No response

Screenshots

No response

OS

Mac

Python version

3.11

AWS SDK for pandas version

3.9.1

Additional context

Possibly related to: https://github.com/aws/aws-sdk-pandas/issues/2066