aws / aws-sdk-pandas

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
https://aws-sdk-pandas.readthedocs.io
Apache License 2.0
3.89k stars 689 forks source link

InvalidSchemaConvergence with redshift.copy #474

Closed dirkgomez closed 3 years ago

dirkgomez commented 3 years ago

AWS Data Wrangler 2.0.0, used to work in 1.X

Below my my code snippet which eventually throws a InvalidSchemaConvergence. The column scan_date is defined as a string in the Glue table, but as the name implies it contains timestamps or emptry strings (and possibly more). The dataframe correctly lists scan_date as a string.

It seems there is some datatype inferencing going on which infers that the column contains both (incompatible) datatypes and then throws an error. How can I circumvent this behaviour?


    df = wr.athena.read_sql_query(
        generate_sql(f"{sql}.sql", table, load_date), database=database,
    )

    path = f"{os.getenv('ATHENA_BUCKET')}prozesszeitenloader/"
    con = wr.redshift.connect("reporting")

    wr.redshift.copy(
        df=df,
        path=path,
        con=con,
        schema="public",
        table=sql,
        mode=mode,
        iam_role=os.getenv("IAM_ROLE"),
        primary_keys=["request_id"],
    )

This is the traceback:

Traceback (most recent call last):
  File "copy_from_s3_to_redshift.py", line 276, in <module>
    handler(
  File "copy_from_s3_to_redshift.py", line 78, in handler
    copy_to_redshift(
  File "copy_from_s3_to_redshift.py", line 61, in copy_to_redshift
    wr.redshift.copy(
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/redshift.py", line 1190, in copy
    copy_from_files(
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/redshift.py", line 1015, in copy_from_files
    created_table, created_schema = _create_table(
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/redshift.py", line 197, in _create_table
    redshift_types = _redshift_types_from_path(
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/redshift.py", line 137, in _redshift_types_from_path
    athena_types, _ = s3.read_parquet_metadata(
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/_config.py", line 361, in wrapper
    return function(**args)
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/s3/_read_parquet.py", line 803, in read_parquet_metadata
    return _read_parquet_metadata(
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/s3/_read_parquet.py", line 152, in _read_parquet_metadata
    columns_types: Dict[str, str] = _merge_schemas(schemas=schemas)
  File "/Users/dirk/Documents/Code/reports/venv/lib/python3.8/site-packages/awswrangler/s3/_read_parquet.py", line 117, in _merge_schemas
    raise exceptions.InvalidSchemaConvergence(
awswrangler.exceptions.InvalidSchemaConvergence: Was detect at least 2 different types in column scan_date (timestamp and string).
dirkgomez commented 3 years ago

I tried the folllowing:

    athena_types, _ = s3.read_parquet_metadata(
        path=path,
        sampling=0.000001, #parquet_infer_sampling,
        dataset=False,
        use_threads=use_threads,
        boto3_session=session,
        s3_additional_kwargs=s3_additional_kwargs,
    )

Type inferencing then (accidentally) picks up what I want, but I run into this

redshift_connector.error.ProgrammingError: {'S': 'ERROR', 'C': 'XX000', 'M': 'Spectrum Scan Error', 'D': "\n  -----------------------------------------------\n  error:  Spectrum Scan Error\n  code:      15001\n  context:   File 'https://s3.eu-central-1.amazonaws.com/aws-athena-query-results/12e881fb23314e3691d480377d96808d_0.snappy.parquet  has an incompatible Parquet schema for column 's3://aws-athena-query-results\n  query:     903188\n  location:  dory_util.cpp:945\n  process:   fetchtask_thread [pid=9748]\n  -----------------------------------------------\n", 'F': '/home/ec2-user/padb/src/sys/xen_execute.cpp', 'L': '8948', 'R': 'pg_throw'}
igorborgest commented 3 years ago

Hi @dirkgomez, thanks for reporting it.

Just for troubleshooting purpose. Could you try to run passing dtype={"scan_date": "string"} for wr.redshift.copy()?

igorborgest commented 3 years ago

It is really weird that you have mixed types on s3... Are you sure the path you pass to wr.redshift.copy() is clean? The function does not clean up the path for you automatically. If you want to do it you can run wr.s3.delete_objects(path).

dirkgomez commented 3 years ago

Hi @dirkgomez, thanks for reporting it.

Just for troubleshooting purpose. Could you try to run passing dtype={"scan_date": "string"} for wr.redshift.copy()?

I tried that it made no difference. I spilled the output of awswrangler/s3/_read_parquet.py", line 117, in _merge_schemas onto stdout and it showed that mentioned field would be inferred with two different datatypes.

I found the offending piece of data in the meantime, in order to reproduce this should do: create an S3 glue table with a schema that defines something as a string and then add timestamp and then string data to it.

dirkgomez commented 3 years ago

It is really weird that you have mixed types on s3... Are you sure the path you pass to wr.redshift.copy() is clean? The function does not clean up the path for you automatically. If you want to do it you can run wr.s3.delete_objects(path).

What constitutes a clean path?

The code is basically identical and it used to run with the last 1.x version (using copy_to_redshift instead of redshift.copy)

igorborgest commented 3 years ago

What constitutes a clean path?

With "clean path" I mean: A S3 prefix without any objects. Could you double check if the path doesn't have any remaining files?

The code is basically identical and it used to run with the last 1.x version (using copy_to_redshift instead of redshift.copy)

This is the weirdest thing, we didn't change nothing related with this type/schema inference. I'm still trying to understand what is going on there.

I found the offending piece of data in the meantime, in order to reproduce this should do: create an S3 glue table with a schema that defines something as a string and then add timestamp and then string data to it.

I think the issue is not with the original table cause you said that the intermediate DataFrame is correctly typed as string. So the issue should be related with the wr.redshift.copy() itself.

Could you enable the logging to check what is going on internally during the wr.redshift.copy() ?

You should be able to do that with:

import logging
logging.basicConfig(level=logging.INFO, format="[%(name)s][%(funcName)s] %(message)s")
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL)

or for AWS lambda:

import logging
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
dirkgomez commented 3 years ago

I downgraded to 1.10.1 and the loading works again. I'll try to compare the outputs, I cannot upload the log outputs, sorry.

dirkgomez commented 3 years ago

Here's where the logs 1.10.1 v 2.0.1 diverge:

Working:

...
DEBUG:awswrangler.s3._fs:Reading: 172 bytes at 7656
DEBUG:awswrangler.s3._fs:Reading: 172 bytes at 7656
DEBUG:awswrangler.athena._read:type(ret): <class 'pandas.core.frame.DataFrame'>
DEBUG:awswrangler.athena._read:type(ret): <class 'pandas.core.frame.DataFrame'>
INFO:root:Copy to redshift with mode overwrite
INFO:root:Copy to redshift with mode overwrite
DEBUG:awswrangler._data_types:Inferring PyArrow type from column: header_data
...

Not working (bucket name redacted)

...
DEBUG:awswrangler.s3._fs:Fetching: s3://aws-athena-query-results/temp_table_f6d6244a020e4be897619ff3e86ea2b1/20201211_151511_00035_adg2a_ee999af9-da49-49d6-814b-fa6e559fbccb - Range: 0-33831
DEBUG:awswrangler.s3._fs:s3_block_size of -1, enabling one_shot_download.
DEBUG:awswrangler.s3._fs:s3_block_size of -1, enabling one_shot_download.
DEBUG:awswrangler.s3._fs:self._size: 21166
DEBUG:awswrangler.s3._fs:self._size: 21166
DEBUG:awswrangler.s3._fs:self._s3_block_size: -1
DEBUG:awswrangler.s3._fs:self._s3_block_size: -1
DEBUG:awswrangler.s3._fs:Fetching: s3://aws-athena-query-results/temp_table_f6d6244a020e4be897619ff3e86ea2b1/20201211_151511_00035_adg2a_48ceca55-cd84-4292-9c74-a6f210fcbfcd - Range: 0-21166
DEBUG:awswrangler.s3._fs:Fetching: s3://aws-athena-query-results/temp_table_f6d6244a020e4be897619ff3e86ea2b1/20201211_151511_00035_adg2a_48ceca55-cd84-4292-9c74-a6f210fcbfcd - Range: 0-21166
DEBUG:awswrangler.s3._fs:Fetching: s3://aws-athena-query-results/temp_table_f6d6244a020e4be897619ff3e86ea2b1/20201211_151511_00035_adg2a_48ceca55-cd84-4292-9c74-a6f210fcbfcd - Range: 0-21166
DEBUG:awswrangler.s3._fs:Fetching: s3://aws-athena-query-results/temp_table_f6d6244a020e4be897619ff3e86ea2b1/20201211_151511_00035_adg2a_48ceca55-cd84-4292-9c74-a6f210fcbfcd - Range: 0-21166
DEBUG:awswrangler.athena._read:type(ret): <class 'pandas.core.frame.DataFrame'>
DEBUG:awswrangler.athena._read:type(ret): <class 'pandas.core.frame.DataFrame'>
Traceback (most recent call last):
  File "copy_from_s3_to_redshift.py", line 280, in <module>
    handler(
....

To downgrade I changed this:

     path = f"{os.getenv('ATHENA_BUCKET')}prozesszeitenloader/"
-    con = wr.redshift.connect("reporting-fgrdba")
+    engine = wr.catalog.get_engine("reporting-fgrdba")

     mode = "overwrite" if load_date == "all" else "upsert"
     LOG.info(f"Copy to redshift with mode {mode}")
     try:
-        wr.redshift.copy(
+        wr.db.copy_to_redshift(
             df=df,
             path=path,
-            con=con,
+            con=engine,
             schema="public",
             table=sql,
             mode=mode,
             iam_role=os.getenv("IAM_ROLE"),
             primary_keys=["fgr_id"],
             varchar_lengths={"text": 65535},
         )
igorborgest commented 3 years ago

@dirkgomez I think I was in the right track. Now I see the change that is probably affecting your routine.

1 - Amazon S3 now delivers strong read-after-write consistency 2 - The Redshift COPY does not require a manifest file anymore to overcome S3 consistency issues. 3 - We have replaced the COPY with manifest by a COPY only with the s3 prefix to speed up the load (Skipping the manifest file creation). 4 - To be able to COPY only with the s3 prefix (with manifest), we need a clean s3 prefix.

So in your cause you can just replace the first line in from your snippet by the code above:

    path = f"{os.getenv('ATHENA_BUCKET')}prozesszeitenloader/temporary_clean_prefix/"
    wr.s3.delete_objects(path)

Could you give it a try?

dirkgomez commented 3 years ago

It must be magic! - It works, thanks a lot.

A clean S3 prefix sounds I should just generate one per run and then "garbage collect" once in a while?

dirkgomez commented 3 years ago

Also I think I do have a hunch of what that means, but I think you should definitely add that to the copy tutorial.

Thanks for the data wrangler in any case.

igorborgest commented 3 years ago

A clean S3 prefix sounds I should just generate one per run and then "garbage collect" once in a while?

You don't need to clean it up by yourself, just leave the argument keep_files=False (default) and all the staging files should be deleted after the COPY automatically.

In the end you just need to provide a safe s3 prefix where Wrangler will not be find old files into. :)

Also I think I do have a hunch of what that means, but I think you should definitely add that to the copy tutorial.

Will do!


Also, for version 2.1.0 I think we should include this wr.s3.delete_objects() inside the function to automatically clean up the path before the COPY. What do you think?

igorborgest commented 3 years ago

Based on the @danielwo 's idea. The PR above is implementing a S3 path validation that is raising an exception if a dirty path is passed forward.

dirkgomez commented 3 years ago

A clean S3 prefix sounds I should just generate one per run and then "garbage collect" once in a while?

You don't need to clean it up by yourself, just leave the argument keep_files=False (default) and all the staging files should be deleted after the COPY automatically.

In the end you just need to provide a safe s3 prefix where Wrangler will not be find old files into. :)

Also I think I do have a hunch of what that means, but I think you should definitely add that to the copy tutorial.

Will do!

Also, for version 2.1.0 I think we should include this wr.s3.delete_objects() inside the function to automatically clean up the path before the COPY. What do you think?

Yes that chimes well with the higher level interface of the data wrangler.

dirkgomez commented 3 years ago

I'll close this ticket as there is already another one addressing this issue. Thanks again for the great work.