aws / aws-sdk-pandas

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
https://aws-sdk-pandas.readthedocs.io
Apache License 2.0
3.94k stars 701 forks source link

Merge into not working with athena.to_iceberg using array columns #3018

Open aockel opened 1 week ago

aockel commented 1 week ago

Describe the bug

I encountered an issue with the athena.to_iceberg method when using the merge into functionality. The issue results in merge into queries running into a timeout on AWS Athena.

I have a dataset with an attribute of type array(string) which is part of the "filter predicate" and the "update set" of the resulting merge into statement that awswrangler generates.

After opening an AWS support case I got the following response from the Athena feature team:

"The internal team were able to get merge query working by explicitly casting the value in the filter predicate. The timeout issue only occurs when a column type ARRAY, MAP, or ROW is used in the filter predicate or when we set the new fields of that column type in the query using UPDATE SET. The awswrangler library does not generate a query with the casts which causes the merge query to timeout. As a workaround we suggest of creating a merge query with explicit casts in the filter predicates and UPDATE SET parts of the query instead of using the to_iceberg function."

Maybe it would be possible to type cast the attributes used in the merge_cols setting. Could also be option if the user provides the types inside the dtypes structure somehow.

How to Reproduce

This can be reproduced by using an array column inside the merge_cols attribute and run the script 2 to 3 times and monitor the Athena merge into query runtime. Even writing some sample data of a couple of kb it will run into a timeout after 30 minutes (max runtime of the default workgroup).

import pandas as pd
import awswrangler as wr
import time

# CHANGE ME
DATABASE = 'aws_wrangler_merge'
TABLE = 'aws_wrangler_merge_test'
BUCKET = 'your-test-bucket-name'

# Sample data
data = {
    'array_attribute': ['9eYM','cXFv','9eYM','cXFv'],
    'string_attribute': "astring",
    'integer_attribute': 5
}

# Create DataFrame
df = pd.DataFrame(data)

# Sort array to ensure we avoid duplicates when merging
def sort_array(arr):
    return sorted(arr)

df['array_attribute'] = df['array_attribute'].apply(sort_array)

# write data in merge mode
temp_path_time = int(time.time() * 1e9)

dtype = {
        'array_attribute': 'array<string>',
        'string_attribute': 'string',
        'string_attribute': 'int',
    }

wr.athena.to_iceberg(
                    df=df,
                    database=DATABASE,
                    table=TABLE,
                    table_location=f's3://{BUCKET}/{TABLE}',
                    temp_path=f's3://{BUCKET}/temp/{TABLE}_{temp_path_time}',
                    schema_evolution=True,
                    keep_files=False,
                    dtype=dtype,
                    merge_cols=['array_attribute', 'string_attribute'],
                    merge_match_nulls=False,
                )

Expected behavior

The merge into should not time out. According to the AWS support the AWS Athena team will still be further looking into why this is happening, but not sure how long it would take them find a fix.

Your project

No response

Screenshots

No response

OS

Mac

Python version

3.10

AWS SDK for pandas version

3.9.1

Additional context

No response