aws / aws-sdk-pandas

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
https://aws-sdk-pandas.readthedocs.io
Apache License 2.0
3.9k stars 696 forks source link

wr.athena.to_iceberg touches file in awswrangler default athena bucket even if s3_output is specified #2766

Closed plbremer closed 5 months ago

plbremer commented 5 months ago

Describe the bug

running something like

import boto3

low_permissions_session = boto3.Session(profile_name='low_permissions_role',region_name='correct-region')

wr.athena.to_iceberg(
    df=pd.DataFrame({'col': [1, 2, 3]}),
    database='my_db',
    table='mhy_table',
    table_location='s3://foo/bar/iceberg/',
    temp_path='s3://foo/bar/iceberg-temp/',
    boto3_session=low_permissions_session,
    s3_output='s3://any-bucket-other-than-awswrangler-default/foo/bar'
)

will still cause awswrangler to write to its preferred default bucket

image

if it does not have access to its default bucket, then the query will not resolve

---------------------------------------------------------------------------
WaiterError                               Traceback (most recent call last)
Cell In[20], line 1
----> 1 wr.athena.to_iceberg(
      2 #     df=total_panda,
      3     df=pd.DataFrame({'col': [1, 2, 3]}),
      4     database='altos_lab_data_lake_staging',
      5     table='single_cell_annotations',
      6     table_location='s3://altos-lab-data-lake/demo_directory/single-cell-annotations/iceberg/',
      7     temp_path='s3://altos-lab-data-lake/demo_directory/single-cell-annotations/iceberg-temp/',
      8     boto3_session=lab_data_wrangler_session,
      9 #     s3_output='s3://altos-lab-a2p-commons-metabase-output/'
     10     s3_output='s3://altos-lab-a2p-commons-metabase-output/parkers-directory/'
     11 )

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/_config.py:715, in apply_configs.<locals>.wrapper(*args_raw, **kwargs)
    713         del args[name]
    714         args = {**args, **keywords}
--> 715 return function(**args)

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/_utils.py:178, in validate_kwargs.<locals>.decorator.<locals>.inner(*args, **kwargs)
    175 if condition_fn() and len(passed_unsupported_kwargs) > 0:
    176     raise exceptions.InvalidArgument(f"{message} `{', '.join(passed_unsupported_kwargs)}`.")
--> 178 return func(*args, **kwargs)

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/athena/_write_iceberg.py:392, in to_iceberg(df, database, table, temp_path, index, table_location, partition_cols, merge_cols, keep_files, data_source, s3_output, workgroup, mode, encryption, kms_key, boto3_session, s3_additional_kwargs, additional_table_properties, dtype, catalog_id, schema_evolution, fill_missing_columns_in_df, glue_table_settings)
    387 try:
    388     # Create Iceberg table if it doesn't exist
    389     if not catalog.does_table_exist(
    390         database=database, table=table, boto3_session=boto3_session, catalog_id=catalog_id
    391     ):
--> 392         _create_iceberg_table(
    393             df=df,
    394             database=database,
    395             table=table,
    396             path=table_location,
    397             wg_config=wg_config,
    398             partition_cols=partition_cols,
    399             additional_table_properties=additional_table_properties,
    400             index=index,
    401             data_source=data_source,
    402             workgroup=workgroup,
    403             encryption=encryption,
    404             kms_key=kms_key,
    405             boto3_session=boto3_session,
    406             dtype=dtype,
    407             columns_comments=glue_table_settings.get("columns_comments"),
    408         )
    409     else:
    410         schema_differences, catalog_cols = _determine_differences(
    411             df=df,
    412             database=database,
   (...)
    418             catalog_id=catalog_id,
    419         )

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/athena/_write_iceberg.py:70, in _create_iceberg_table(df, database, table, path, wg_config, partition_cols, additional_table_properties, index, data_source, workgroup, encryption, kms_key, boto3_session, dtype, columns_comments)
     57 table_properties_str: str = (
     58     ", " + ", ".join([f"'{key}'='{value}'" for key, value in additional_table_properties.items()])
     59     if additional_table_properties
     60     else ""
     61 )
     63 create_sql: str = (
     64     f"CREATE TABLE IF NOT EXISTS `{table}` ({cols_str}) "
     65     f"{partition_cols_str} "
     66     f"LOCATION '{path}' "
     67     f"TBLPROPERTIES ('table_type' ='ICEBERG', 'format'='parquet'{table_properties_str})"
     68 )
---> 70 query_execution_id: str = _start_query_execution(
     71     sql=create_sql,
     72     workgroup=workgroup,
     73     wg_config=wg_config,
     74     database=database,
     75     data_source=data_source,
     76     encryption=encryption,
     77     kms_key=kms_key,
     78     boto3_session=boto3_session,
     79 )
     80 wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/athena/_utils.py:96, in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, execution_params, client_request_token, boto3_session)
     92 args: dict[str, Any] = {"QueryString": sql}
     94 # s3_output
     95 args["ResultConfiguration"] = {
---> 96     "OutputLocation": _get_s3_output(s3_output=s3_output, wg_config=wg_config, boto3_session=boto3_session)
     97 }
     99 # encryption
    100 if wg_config.enforced is True:

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/athena/_utils.py:76, in _get_s3_output(s3_output, wg_config, boto3_session)
     74 if wg_config.s3_output is not None:
     75     return wg_config.s3_output
---> 76 return create_athena_bucket(boto3_session=boto3_session)

File ~/anaconda3/lib/python3.11/site-packages/awswrangler/athena/_utils.py:465, in create_athena_bucket(boto3_session)
    463     if err.response["Error"]["Code"] == "OperationAborted":
    464         _logger.debug("A conflicting conditional operation is currently in progress against this resource.")
--> 465 client_s3.get_waiter("bucket_exists").wait(Bucket=bucket_name)
    466 return path

File ~/anaconda3/lib/python3.11/site-packages/botocore/waiter.py:55, in create_waiter_with_client.<locals>.wait(self, **kwargs)
     54 def wait(self, **kwargs):
---> 55     Waiter.wait(self, **kwargs)

File ~/anaconda3/lib/python3.11/site-packages/botocore/waiter.py:388, in Waiter.wait(self, **kwargs)
    383     else:
    384         reason = (
    385             'Max attempts exceeded. Previously accepted state: %s'
    386             % (acceptor.explanation)
    387         )
--> 388     raise WaiterError(
    389         name=self.name,
    390         reason=reason,
    391         last_response=response,
    392     )
    393 time.sleep(sleep_amount)

WaiterError: Waiter BucketExists failed: Max attempts exceeded. Previously accepted state: Matched expected HTTP status code: 404

note that in the case where permissions exist, corresponding to the above image, the output is created as expected in the specified s3 thereafter

image

How to Reproduce

Give yourself an AWS IAM role that does not have permission to create buckets in an environment that does not have awswranglers default bucket already made. try to create an iceberg table. awswrangler will complain that the default bucket does not exist.

Expected behavior

No response

Your project

No response

Screenshots

No response

OS

mac 13.5 (22G74)

Python version

3.11.3

AWS SDK for pandas version

3.7.2

Additional context

No response

jaidisido commented 5 months ago

Thanks, it does look like s3_output was not properly trickled down to all underlying methods. Should be fixed by #2767. You can test it by installing the library via the branch:

pip uninstall awswrangler -y
pip install git+https://github.com/awslabs/aws-data-wrangler.git@fix/missing-s3-output
plbremer commented 5 months ago

Thanks, it does look like s3_output was not properly trickled down to all underlying methods. Should be fixed by #2767. You can test it by installing the library via the branch:

pip uninstall awswrangler -y
pip install git+https://github.com/awslabs/aws-data-wrangler.git@fix/missing-s3-output

thanks for the super rapid fix