aws / aws-sdk-pandas

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
https://aws-sdk-pandas.readthedocs.io
Apache License 2.0
3.94k stars 701 forks source link

fix: handle partitions with empty table in read_parquet with dataset=True #2983

Open cournape opened 1 month ago

cournape commented 1 month ago

When reading a set of parquet files with dataset=True, if the first partition is empty the current logic for dtype inference will fail. It ill raise exceptions as follows:

pyarrow.lib.ArrowTypeError: Unable to merge: Field col0 has incompatible
types: dictionary<values=null, indices=int32, ordered=0> vs
dictionary<values=string, indices=int32, ordered=0

To fix this, we filter out empty table(s) before merging them into one parquet file.

Note: I have only run the mock test suite, I can't easily run the suite against actual AWS services.

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

jaidisido commented 1 month ago

I am bit confused about the way you are generating your partitioned dataset. Specifically this bit:

    for i, df in enumerate(dataframes):
        wr.s3.to_parquet(
            df=df,
            path=f"{s3_key}/part{i}.parquet",
        )

You are artificially creating the partitions instead of relying on awswrangler or pandas to do it.

When I rewrite your test with a proper partitioning call, the exception is not raised:

def test_s3_dataset_empty_table(moto_s3_client: "S3Client") -> None:
    """Test that a dataset split into multiple parquet files whose first
    partition is an empty table still loads properly.
    """
    s3_key = f"s3://bucket/"

    dtypes = {"id": "string[python]"}
    df1 = pd.DataFrame({"id": []}).astype(dtypes)
    df2 = pd.DataFrame({"id": ["1"] * 2}).astype(dtypes)
    df3 = pd.DataFrame({"id": ["1"] * 3}).astype(dtypes)

    dataframes = [df1, df2, df3]
    r_df = pd.concat(dataframes, ignore_index=True)
    r_df = r_df.assign(col0=pd.Categorical(["1"] * len(r_df)))

    wr.s3.to_parquet(r_df, path=s3_key, dataset=True, partition_cols=["col0"])

    result_df = wr.s3.read_parquet(path=s3_key, dataset=True)
    pd.testing.assert_frame_equal(result_df, r_df, check_dtype=True)

The difference being that in the code above, the files are written as a single dataset and the metadata for it is preserved

cournape commented 1 month ago

The difference being that in the code above, the files are written as a single dataset and the metadata for it is preserved

In that test example, there is no empty table written to s3 since you are concatenating the dataframes before writing to s3.

To trigger the error, the execution needs to go through https://github.com/aws/aws-sdk-pandas/blob/635f6d55b2ad1f77c9033cccee66c14f0b56e91b/awswrangler/_arrow.py#L33 with > 1 table, including one empty.

That exception I am trying to fix is triggered by some real parquet datasets created from spark. If that helps, happy to give details through our internal slack at amazon, including the dataset.

cournape commented 1 month ago

To trigger the bug, you need all of the following to be true

  1. dataset=True
  2. the datasets being read has more than one parquet file, with the first one read to be empty (order may not matter)
  3. the s3 key must contain some partition so that awswrangler add the corresponding column(s)

When that happens, the underlying error is the type inference happening here: https://github.com/aws/aws-sdk-pandas/blob/main/awswrangler/_arrow.py#L41.

If a table is not empty, part_value will contain the right "type" for that added column partition. But if table is empty, you get a type that is independent of the value, e.g.

>>> pa.array(["1"] * 0).dictionary_encode()
<pyarrow.lib.DictionaryArray object at 0x7f63e97d9cb0>

-- dictionary:
0 nulls
-- indices:
  []

vs (no empty table)

>>> pa.array(["1"] * 1).dictionary_encode()
<pyarrow.lib.DictionaryArray object at 0x7f63bda8eb20>

-- dictionary:
  [
    "1"
  ]
-- indices:
  [
    0
  ]

When both cases happen in the tables list, you get an exception when merging them because of incompatible column types.

cournape commented 1 month ago

@jaidisido anything else I could provide to move this PR forward ? Happy to add more tests if needed

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

malachi-constant commented 1 month ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository