aws / aws-sdk-pandas

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
https://aws-sdk-pandas.readthedocs.io
Apache License 2.0
3.94k stars 702 forks source link

suggest to add pandas.to_sql() #74

Closed josecw closed 4 years ago

josecw commented 5 years ago

Is it possible to have ability similar to function below:

gluecontext.write_dynamic_frame.from_jdbc_conf() as below? datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Currently the way we do is:

  1. Get SQL from S3 file and pass into pandas.read_sql_athena()
  2. use SQLAlchemy to execute preactions SQL. For our case is delete before load
  3. use SQLAlchemy and pandas.to_sql() to append dataframe into aurora table
igorborgest commented 4 years ago

Hi @BellyTheMagnificent! Thanks again for one more valuable input!

We just developed a Aurora+Pandas integration available on version +0.1.4.

Now you can LOAD (MySQL/PostgreSQL) and UNLOAD (MySQL) to/from Pandas DataFrames using the HIGH throughput S3 integration:

import awswrangler as wr

wr.pandas.to_aurora(
    dataframe=df,
    connection=con,
    schema="...",
    table="..."
)

df = wr.pandas.read_sql_aurora(
    sql="SELECT ...",
    connection=con
)

For those operation and databases that does not have S3 integration, please keep using the native Pandas/PySpark APIs:

  1. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
  2. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  3. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc
  4. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameWriter.jdbc

Please, test it and give us some feedbacks! 🚀

josecw commented 4 years ago

Encoutered issue below: pg8000.core.ProgrammingError: {'S': 'ERROR', 'V': 'ERROR', 'C': '22P04', 'M': 'unterminated CSV quoted field', 'W': 'COPY table_A, line 1804405: ""93215","XXXXX","I","AA","93215","X","XXXXX","XXX","\\","2020-01-05 00:00:00.000","XXXX","..."\nSQL statement "copy [schema].[table] from \'/rdsdbdata/extensions/aws_s3/amazon-s3-fifo-29761-20200106T092927Z-0\' with (FORMAT CSV, DELIMITER \',\', QUOTE \'"\', ESCAPE \'\\\')"', 'F': 'copy.c', 'L': '4530', 'R': 'CopyReadAttributesCSV'}

Wonder if this is cause by the value \ in one of the column.

Still testing. It would be great if it can:

  1. Perform rollback when process fail pg8000.core.ProgrammingError: {'S': 'ERROR', 'V': 'ERROR', 'C': '25P02', 'M': 'current transaction is aborted, commands ignored until end of transaction block', 'F': 'postgres.c', 'L': '1356', 'R': 'exec_parse_message'}
  2. Option to pass Glue Connection Name as connection
igorborgest commented 4 years ago

Hi @BellyTheMagnificent! Really valuable feedbacks, thank you!

I just implemented all 3 points raised by you in the above PR. What you think?

  1. Also accepts Glue connection name option instead of only PEP 249 connections
  2. Rolling back in case of failure
  3. Handling special characters (escaping)

Maybe the better way to you see how it work is through the added tests.

josecw commented 4 years ago

It look great. Thank you for the help!

Tried replace() before to_aurora() but it don't work too

str_cols = df.select_dtypes(include=['object']).columns df.loc[:, str_cols] = df.loc[:, str_cols].replace('\\','\\\\')

igorborgest commented 4 years ago

@BellyTheMagnificent let me know if you have more feedbacks in the future!

josecw commented 4 years ago

Having this error:

File "<stdin>", line 1, in <module> File "/home/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 1525, in to_aurora region=region) File "/home/lib64/python3.6/dist-packages/awswrangler/aurora.py", line 169, in load_table cursor.execute(sql) File "/home/lib64/python3.6/dist-packages/pg8000/core.py", line 861, in execute self._c.execute(self, operation, args) File "/home/lib64/python3.6/dist-packages/pg8000/core.py", line 1909, in execute self.handle_messages(cursor) File "/home/lib64/python3.6/dist-packages/pg8000/core.py", line 1976, in handle_messages raise self.error pg8000.core.ProgrammingError: {'S': 'ERROR', 'V': 'ERROR', 'C': 'XX000', 'M': 'HTTP 412. The file has been modified since the import call started.', 'F': 'aws_s3.c', 'L': '394', 'R': 'perform_import'}

josecw commented 4 years ago

The temp csv file contain double quote (") for numeric field and causing error below:

ERROR: invalid input syntax for type numeric: "" CONTEXT: COPY tbl_a, line 23, column xxxx: ""

igorborgest commented 4 years ago

@BellyTheMagnificent I'm not sure if I discovered the root cause of this error.

Could you check if your case is being coverage by this last PR? (#104)

If not, please send some mocked dataframe and I will add it to our tests

josecw commented 4 years ago

Would there a release for this? I can test it. Wasn't sure it relate to first error: pg8000.core.ProgrammingError: {'S': 'ERROR', 'V': 'ERROR', 'C': 'XX000', 'M': 'HTTP 412. The file has been modified since the import call started.', 'F': 'aws_s3.c', 'L': '394', 'R': 'perform_import'} The float number was discovered by manual run aws_common() on aurora after first error occurred.

igorborgest commented 4 years ago

@BellyTheMagnificent new release available: https://github.com/awslabs/aws-data-wrangler/releases

josecw commented 4 years ago

Thank you Igor, I've tried the latest release and it still have the same issue on to_aurora().

  1. The error return from pg8000 become followig: Exception: {'S': 'ERROR', 'V': 'ERROR', 'C': 'XX000', 'M': 'HTTP 412. The file has been modified since the import call started.', 'F': 'aws_s3.c', 'L': '394', 'R': 'perform_import'}
  2. The temp csv file still contain quote char for numeric field

I did following test:

  1. run the same query using awswrangle.pandas.read_sql_athena()
  2. run awswrangle.pandas.to_csv(). The output csv contain quote for numeric field
  3. run select aws_s3.table_import_from_s3 in postgres but receive error ERROR: invalid input syntax for type numeric: ""
  4. run pandas.to_csv(path_or_buf ="output.csv"), quotechar='"', index_label=False, index=False, quoting=csv.QUOTE_MINIMAL, header=False) and execute aws_s3.table_import_from_s3 then it successfully import.

below are the random sample data (output from awswrangler.to_aurora()) "0","8986","O","050100","A","SGD","0.0","0000296722","","2020-01-07 00:00:00.000","XXX","2020-01-10 10:34:55.863" "1","9735","P","010101","R","SGD","1800000.0","0000199396","C","2020-01-07 00:00:00.000","XXX","2020-01-10 10:34:55.863" "2","9918","P","010101","A","SGD","","0000298592","C","2020-01-07 00:00:00.000","XXX","2020-01-10 10:34:55.863" "3","9150","O","050100","R","SGD","0.0","0000196380","","2020-01-07 00:00:00.000","XXX","2020-01-10 10:34:55.863" below are the random sample data (output from df.to_csv()) that successfully import to aurora 8986,O,050100,A,SGD,0.0,0000296722,,2020-01-07 00:00:00.000,XXX,2020-01-10 10:34:55.863 9735,P,010101,R,SGD,1800000.0,0000199396,C,2020-01-07 00:00:00.000,XXX,2020-01-10 10:34:55.863 9918,P,010101,A,SGD,,0000298592,C,2020-01-07 00:00:00.000,XXX,2020-01-10 10:34:55.863 9306,P,010101,R,SGD,,0000198130,C,2020-01-07 00:00:00.000,XXX,2020-01-10 10:34:55.863

igorborgest commented 4 years ago

@BellyTheMagnificent thanks for the infos.

I've just added your sample to the test and found out that the root cause of the error was the NULL handling for double/numeric values.

I've already fixed it. But I still not been able to reproduce your error number 1 about "The file has been modified since the import call started.".

The both errors are connected? Or the first one is related with another dataset?

josecw commented 4 years ago

Wasn't sure if both are connected. Only can find out in the actual run environment. Any possibility "The file has been modified since the import call started." is caused by S3 Server or Object logging ?

The flow we did is

  1. run read_sql_athena()
  2. Execute SQL (delete before load) into aurora
  3. run to_aurora()
igorborgest commented 4 years ago

I've found out the issue with "The file has been modified since the import call started.".

I had testing this functionality running only locally with a HIGH latency between my machine and the Aurora cluster in my AWS account.

Now testing on a SageMaker notebook I could replicate your error and find out the problem.

It's basically because copy/aws_s3.table_import_from_s3 suffers with s3 eventual consistency. Probably its does any kind of LIST behind the scenes that does not see the objects written immediately before the command execution.

How Aurora Postgres does not count with a Manifesto like Aurora MySQL, I had to handle that retrying.

Btw, I've tested a lot here and I think that we are good to go to the next release!

igorborgest commented 4 years ago

Hey @BellyTheMagnificent, could you also test the new 0.2.5 version?

josecw commented 4 years ago

hi @igorborgest , it work now. Scripts are loaded successfully to aurora when Athena query meta match with aurora table column order. If the query result don't match with target table column order, it will raise error. This can resolve by specifying column list in s3_import command. Will raise a separate enhancement issue for this matter.

Thank you very much!

josecw commented 4 years ago

Hi @igorborgest , sorry i come back to this issue thread again. is there away to increase max_attempt_number from api ?

@tenacity.retry(retry=tenacity.retry_if_exception_type(exception_types=ProgrammingError), wait=tenacity.wait_random_exponential(multiplier=0.5), stop=tenacity.stop_after_attempt(max_attempt_number=5), reraise=True, after=tenacity.after_log(logger, INFO))

I having some intermittent errors where s3 couldn't make it back in 5 retries.