aws / amazon-redshift-python-driver

Redshift Python Connector. It supports Python Database API Specification v2.0.
Apache License 2.0
202 stars 72 forks source link

Cannot Write to RedShift Table with Write_dataframe method. The method only takes two parameters not sure where it is referring to a "?" for table name. #202

Closed baltimoreravens-it closed 2 weeks ago

baltimoreravens-it commented 6 months ago

Driver version

redshift_connector-2.0.918

Redshift version

PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.61191

Client Operating System

Windows 10

Python version

3.11

Table schema

image

Problem description

  1. Expected behaviour: Write Dataframe into Redshift table with write_dataframe method.

  2. Actual behaviour: Getting a key error that is not very clear where the issue is. Keeps passing in the table_name as "?"

  3. Error message/stack trace:

    
    . KeyError                                  Traceback (most recent call last)
    File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\core.py:1796, in Connection.execute(self, cursor, operation, vals)
    1795 try:
    -> 1796     ps = cache["ps"][key]
    1797     _logger.debug("Using cached prepared statement")

KeyError: ('select 1 from pg_catalog.svv_all_tables where table_name = ? and table_schema = ?', ((<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x0000024EC4E5E160>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x0000024EC4E5E160>)))

During handling of the above exception, another exception occurred:

ProgrammingError Traceback (most recent call last) Cell In[97], line 11 3 with redshift_connector.connect( 4 host='xxxxxxxxxxxxxx.koresoftware.com', 5 database='xxxxxxx', 6 user='XXXXXXXXXXX', 7 password='XXXXXXXXXXXXXXXXXXXXX' 8 ) as conn: 9 with conn.cursor() as cursor: 10 # cursor.execute("create Temp table book(bookname varchar,author‎ varchar)") ---> 11 cursor.write_dataframe(report_df_clean, 'custom.google_ad_mgr') 12 # cursor.insert_data_bulk(output_file, "balravens.custom.google_ad_mgr", column_names=column_list, parameter_indices=range(0,19), delimiter="," ) 13 # cursor.execute("select * from custom.google_ad_mgr") 14 # result = cursor.fetchall() (...) 22 # password='XXXXXXXXXXXXXXXXXX' 23 # )

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:584, in Cursor.write_dataframe(self, df, table) 581 except ModuleNotFoundError: 582 raise ModuleNotFoundError(MISSING_MODULE_ERROR_MSG.format(module="pandas")) --> 584 if not self.__is_valid_table(table): 585 raise InterfaceError("Invalid table name passed to write_dataframe: {}".format(table)) 586 sanitized_table_name: str = self.__sanitize_str(table)

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:553, in Cursor.__is_valid_table(self, table) 551 if len(split_table_name) == 2: 552 q += " and table_schema = ?" --> 553 self.execute(q, (split_table_name[1], split_table_name[0])) 554 else: 555 self.execute(q, (split_table_name[0],))

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:248, in Cursor.execute(self, operation, args, stream, merge_socket_read) 246 except: 247 pass --> 248 raise e 249 return self

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:241, in Cursor.execute(self, operation, args, stream, merge_socket_read) 239 self._c.execute(self, "begin transaction", None) 240 self._c.merge_socket_read = merge_socket_read --> 241 self._c.execute(self, operation, args) 242 except Exception as e: 243 try:

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\core.py:1877, in Connection.execute(self, cursor, operation, vals) 1874 else: 1875 raise e -> 1877 self.handle_messages(cursor) 1879 # We've got row_desc that allows us to identify what we're 1880 # going to get back from this statement. 1881 output_fc = tuple(self.redshift_types[f["type_oid"]][0] for f in ps["row_desc"])

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\core.py:2169, in Connection.handle_messages(self, cursor) 2166 self.message_types[code](self._read(data_len - 4), cursor) 2168 if self.error is not None: -> 2169 raise self.error

ProgrammingError: {'S': 'ERROR', 'C': '42703', 'M': 'column "table_schema" does not exist in svv_all_tables', 'F': '../src/pg/src/backend/parser/parse_expr.c', 'L': '1835', 'R': 'transformColumnRef'}

6. Any other details that can be helpful:

## Python Driver trace logs
<!--- Provide the Python Driver trace logs.  -->
How would I get these?

## Reproduction code
<!--- Provide standalone Python code that reproduces the issue, preferably raw Python code. -->
```python

# There is more code before this that makes the call to google ad manager but really what matters is what the df is. 

column_list = report_df.columns.tolist()

column_rename_mp = {cn : cn.split('.')[1].lower() for cn in column_list}

report_df_clean = report_df.rename(columns=column_rename_mp) # This is a DF of data grabbed from google_ad_manager already cleaned up and explicitly defined dtypes to match redshift table schema (as best I could match it) 

# Redshift portion 
import redshift_connector
with redshift_connector.connect(
    host='xxxxxxxxxxxxxx.koresoftware.com',
    database='xxxxxxx',
    user='xxxxxxxxxxxxx',
    password='xxxxxxxxxxxxxxxxxxxx'
) as conn:
    with conn.cursor() as cursor:
        # cursor.execute("create Temp table book(bookname varchar,author‎ varchar)")
        cursor.write_dataframe(report_df_clean, 'custom.google_ad_mgr')
Brooke-white commented 5 months ago

Hi @baltimoreravens-it, can you please confirm you're using the latest redshift-connector? we resolved this issue in the latest release, and I believe the error message you've provided corresponds to an earlier version which does not contain the fix.

baltimoreravens-it commented 5 months ago

redshift-connector==2.0.918 is the current version i have installed through pip

Brooke-white commented 5 months ago

Based on the trace provided self.__is_valid_table(table) is the method throwing the exception. I recognize the error message ProgrammingError: {'S': 'ERROR', 'C': '42703', 'M': 'column "table_schema" does not exist in svv_all_tables', 'F': '../src/pg/src/backend/parser/parse_expr.c', 'L': '1835', 'R': 'transformColumnRef'} as it was resolved as apart of this commit ref https://github.com/aws/amazon-redshift-python-driver/issues/198 which went into 2.0.918. If you take a look at the commit, we were previously incorrectly querying the table_schema column of svv_all_tables (causing the error you see) and changed the query in 2.0.918. You can see this in our mainline. As a sanity check, I pip install redshift-connector=2.0.918 and manually inspected the source to confirm table_schema is not queried from the svv_all_tables table anywhere in cursor.py.

Please try printing out the version of redshift-connector from your script as an additional confirmation that is indeed the version in use. If the issue is still seen please follow README instructions to enable logging and share the debug logs on this issue.

baltimoreravens-it commented 5 months ago

Looks like you are correct the error is gone! Now I am just getting a casting error which is odd because I am casting the column as datetime but its reading it as small int, the same df goes through just fine when using the pandas.to_sql method with sqlalchemy and the sqlalchemy redshift driver. Not sure if its another issue but just wanted to bring it up.

pd.to_datetime(report_df_clean['line_item_end_date'], format="ISO8601") --. this is what im casting to
pd.to_datetime(report_df_clean['line_item_start_date'], format="ISO8601")

KeyError Traceback (most recent call last) File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\core.py:1793, in Connection.execute(self, cursor, operation, vals) 1792 try: -> 1793 ps = cache["ps"][key] 1794 _logger.debug("Using cached prepared statement")

KeyError: ('insert into custom.google_ad_mgr values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', ((<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.INTEGER: 23>, 1, <built-in method pack of _struct.Struct object at 0x000001D07D8077E0>), (<RedshiftOID.BIGINT: 20>, 1, <built-in method pack of _struct.Struct object at 0x000001D07DE4F150>), (<RedshiftOID.BIGINT: 20>, 1, <built-in method pack of _struct.Struct object at 0x000001D07DE4F150>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.SMALLINT: 21>, 1, <built-in method pack of _struct.Struct object at 0x000001D07DE23A60>), (<RedshiftOID.SMALLINT: 21>, 1, <built-in method pack of _struct.Struct object at 0x000001D07DE23A60>), (<RedshiftOID.UNKNOWN: 705>, 0, <function text_out at 0x000001D07B2A5B20>), (<RedshiftOID.SMALLINT: 21>, 1, <built-in method pack of _struct.Struct object at 0x000001D07DE23A60>), (<RedshiftOID.SMALLINT: 21>, 1, <built-in method pack of _struct.Struct object at 0x000001D07DE23A60>), (<RedshiftOID.FLOAT: 701>, 1, <built-in method pack of _struct.Struct object at 0x000001D07D7FD8A0>)))

During handling of the above exception, another exception occurred:

ProgrammingError Traceback (most recent call last) File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:600, in Cursor.write_dataframe(self, df, table) 599 elif len(arrays) > 1: --> 600 self.executemany(sql, arrays) 601 except:

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:272, in Cursor.executemany(self, operation, param_sets) 271 for parameters in param_sets: --> 272 self.execute(operation, parameters) 273 rowcounts.append(self._row_count)

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:248, in Cursor.execute(self, operation, args, stream, merge_socket_read) 247 pass --> 248 raise e 249 return self

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:241, in Cursor.execute(self, operation, args, stream, merge_socket_read) 240 self._c.merge_socket_read = merge_socket_read --> 241 self._c.execute(self, operation, args) 242 except Exception as e:

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\core.py:1874, in Connection.execute(self, cursor, operation, vals) 1872 raise e -> 1874 self.handle_messages(cursor) 1876 # We've got row_desc that allows us to identify what we're 1877 # going to get back from this statement.

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\core.py:2166, in Connection.handle_messages(self, cursor) 2165 if self.error is not None: -> 2166 raise self.error

ProgrammingError: {'S': 'ERROR', 'C': '42804', 'M': 'column "line_item_start_date" is of type timestamp with time zone but expression is of type smallint', 'H': 'You will need to rewrite or cast the expression.', 'F': '../src/pg/src/backend/parser/parse_target.c', 'L': '675', 'R': 'updateTargetListEntry'}

During handling of the above exception, another exception occurred:

InterfaceError Traceback (most recent call last) Cell In[125], line 11 3 with redshift_connector.connect( 4 host='xxxxxxxxxxx..koresoftware.com', 5 database='xxxxxxxxxs', 6 user='rxxxxxx_xxxxxz', 7 password='xxxxxxxxxxx' 8 ) as conn: 9 with conn.cursor() as cursor: 10 # cursor.execute("create Temp table book(bookname varchar,author‎ varchar)") ---> 11 cursor.write_dataframe(report_df_clean, 'custom.google_ad_mgr') 12 # cursor.insert_data_bulk(output_file, "balravens.custom.google_ad_mgr", column_names=column_list, parameter_indices=range(0,19), delimiter="," ) 13 # cursor.execute("select * from custom.google_ad_mgr") 14 # result = cursor.fetchall() (...) 22 # password='xxxxxxxxxxxxx' 23 # )

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\redshift_connector\cursor.py:602, in Cursor.write_dataframe(self, df, table) 600 self.executemany(sql, arrays) 601 except: --> 602 raise InterfaceError( 603 "An error occurred when attempting to insert the pandas.DataFrame into ${}".format(table) 604 ) 605 finally: 606 self.paramstyle = cursor_paramstyle

InterfaceError: An error occurred when attempting to insert the pandas.DataFrame into $custom.google_ad_mgr