snowflakedb / snowflake-connector-python

Snowflake Connector for Python
https://pypi.python.org/pypi/snowflake-connector-python/
Apache License 2.0
601 stars 473 forks source link

SNOW-1625324: `execute_async()` + `get_results_from_sfqid()` has a naive error handler #2026

Closed Kache closed 2 months ago

Kache commented 3 months ago

Python version

Python 3.11.4 (main, Jan 10 2024, 15:34:31) [Clang 15.0.0 (clang-1500.1.0.2.5)]

Operating system and processor architecture

macOS-14.6-arm64-arm-64bit

Installed packages

snowflake-connector-python 3.2.0

but issue present in `master` as well

What did you do?

import logging
import snowflake.connector
import utils.snowflake as sf

logging.basicConfig(level=logging.INFO)

available_options = ['use execute()', 'use execute_async()', 'use workaround']
option = available_options[1]

def query(sql: str, configs: dict):
    with snowflake.connector.connect(**configs) as conn:
        with conn.cursor() as cur:

            # intent: log the query id before results are ready
            yield from execute_while_logging_sfqid(cur, sql)

def execute_while_logging_sfqid(cur: snowflake.connector.cursor.SnowflakeCursor, sql: str):
    if option == 'use execute()':
        # logging.info(f"Executed query: {cur.sfqid}")  # can't do this
        cur.execute(sql)
        logging.info(f"Executed query: {cur.sfqid}")  # unfortunately, after query is complete

        yield from cur

    elif option == 'use execute_async()':
        cur.execute_async(sql)
        logging.info(f"Executing query: {cur.sfqid}")  # good: logs before results are ready

        cur.get_results_from_sfqid(cur.sfqid)

        # unfortunately, wait_until_ready() has a naive error handler
        # that only says "Status of query '{}' is {}, results are unavailable":
        # https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/cursor.py#L1657-L1662
        yield from cur

    elif option == 'use workaround':
        cur.execute_async(sql)
        logging.info(f"Executing query: {cur.sfqid}")  # good: logs before results are ready

        cur.get_results_from_sfqid(cur.sfqid)

        if cur._prefetch_hook:
            try:
                cur._prefetch_hook()
            except snowflake.connector.DatabaseError:  # catch the naive error
                # use better error handler that raises more detailed errors:
                # https://github.com/snowflakedb/snowflake-connector-python/blob/main/src/snowflake/connector/connection.py#L1849-L1872
                cur.connection.get_query_status_throw_if_error(cur.sfqid)
                raise

        yield from cur

    else:
        yield from []

What did you expect to see?

Expected to see the use execute_async() path above to behave similarly to the use workaround path:

This can be done by replacing the naive query status/error handling in wait_until_ready(): https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/cursor.py#L1657-L1662

with the better query status/error handling of get_query_status_throw_if_error(): https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/connection.py#L1849-L1872

Can you set logging to DEBUG and collect the logs?

import logging
import os

for logger_name in ('snowflake.connector',):
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
sfc-gh-sghosh commented 3 months ago

Hello @Kache ,

Thanks for raising the issue, we are investigating, will update.

Regards, Sujan

sfc-gh-sghosh commented 3 months ago

Hello @Kache ,

I just executed the code snippet in asynchronous mode with incorrect SQL statement and I am getting the query id and the proper error message, its not losing the SQL compilation error, run time error information etc.

 elif option == 'use execute_async()':
        cur.execute_async(sql)
        logging.info(f"Executing execute_async query: {cur.sfqid}")  # good: logs before results are ready
        cur.get_results_from_sfqid(cur.sfqid)
        yield from cur

sql_query = "copy into sujancsv select * from mycsvtable1"

for result in query(sql_query, configs):
    print(result)

Output:
INFO:root:Executing execute_async query: 01b6862c-080b-17a3-0000-164926f2564a
INFO:snowflake.connector.connection:closed
INFO:snowflake.connector.connection:No async queries seem to be running, deleting session
ProgrammingError                          Traceback (most recent call last)
Cell In[20], line 72
     69 sql_query = "copy into sujancsv select * from mycsvtable1"
     71 # Execute and print results
---> 72 for result in query(sql_query, configs):
     73     print(result)

Cell In[20], line 26, in query(sql, configs)
     22 with snowflake.connector.connect(**configs) as conn:
     23     with conn.cursor() as cur:
     24 
     25         # intent: log the query id before results are ready
---> 26         yield from execute_while_logging_sfqid(cur, sql)

Cell In[20], line 41, in execute_while_logging_sfqid(cur, sql)
     38 cur.execute_async(sql)
     39 logging.info(f"Executing execute_async query: {cur.sfqid}")  # good: logs before results are ready
---> 41 cur.get_results_from_sfqid(cur.sfqid)
     43 # unfortunately, wait_until_ready() has a naive error handler
     44 # that only says "Status of query '{}' is {}, results are unavailable":
     45 # https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/cursor.py#L1657-L1662
     46 yield from cur

File /usr/local/lib/python3.8/site-packages/snowflake/connector/cursor.py:1679, in SnowflakeCursor.get_results_from_sfqid(self, sfqid)
   1676         if "data" in ret and "resultIds" in ret["data"]:
   1677             self._init_multi_statement_results(ret["data"])
-> 1679 self.connection.get_query_status_throw_if_error(
   1680     sfqid
   1681 )  # Trigger an exception if query failed
   1682 klass = self.__class__
   1683 self._inner_cursor = klass(self.connection)

File /usr/local/lib/python3.8/site-packages/snowflake/connector/connection.py:1824, in SnowflakeConnection.get_query_status_throw_if_error(self, sf_qid)
   1820         message += (
   1821             queries[0].get("errorMessage", "") if len(queries) > 0 else ""
   1822         )
   1823         sql_state = status_resp["data"].get("sqlState")
-> 1824     Error.errorhandler_wrapper(
   1825         self,
   1826         None,
   1827         ProgrammingError,
   1828         {
   1829             "msg": message,
   1830             "errno": int(code),
   1831             "sqlstate": sql_state,
   1832             "sfqid": sf_qid,
   1833         },
   1834     )
   1835 return status

File /usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py:290, in Error.errorhandler_wrapper(connection, cursor, error_class, error_value)
    267 @staticmethod
    268 def errorhandler_wrapper(
    269     connection: SnowflakeConnection | None,
   (...)
    272     error_value: dict[str, Any],
    273 ) -> None:
    274     """Error handler wrapper that calls the errorhandler method.
    275 
    276     Args:
   (...)
    287         exception to the first handler in that order.
    288     """
--> 290     handed_over = Error.hand_to_other_handler(
    291         connection,
    292         cursor,
    293         error_class,
    294         error_value,
    295     )
    296     if not handed_over:
    297         raise Error.errorhandler_make_exception(
    298             error_class,
    299             error_value,
    300         )

File /usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py:348, in Error.hand_to_other_handler(connection, cursor, error_class, error_value)
    346     return True
    347 elif connection is not None:
--> 348     connection.errorhandler(connection, cursor, error_class, error_value)
    349     return True
    350 return False

File /usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py:221, in Error.default_errorhandler(connection, cursor, error_class, error_value)
    219 errno = error_value.get("errno")
    220 done_format_msg = error_value.get("done_format_msg")
--> 221 raise error_class(
    222     msg=error_value.get("msg"),
    223     errno=None if errno is None else int(errno),
    224     sqlstate=error_value.get("sqlstate"),
    225     sfqid=error_value.get("sfqid"),
    226     query=error_value.get("query"),
    227     done_format_msg=(
    228         None if done_format_msg is None else bool(done_format_msg)
    229     ),
    230     connection=connection,
    231     cursor=cursor,
    232 )`

ProgrammingError: 001003: 1003: SQL compilation error: syntax error line 1 at position 19 unexpected 'select'.

Kache commented 3 months ago

In order to reproduce, can't use a query that "fails too quickly", where the failure is immediately available and immediately returned. Here's an example of a query that takes some execution time before failing:

        SELECT *
        FROM (
            SELECT SEQ4()::variant AS col
            FROM TABLE(GENERATOR(ROWCOUNT => 1000000))  -- increase as needed
            UNION
            SELECT 'fail'::variant AS col  -- fails at runtime when execution reaches this row
        ) a
        JOIN (VALUES (1)) b(col) ON a.col::varchar::int = b.col

Perhaps if the following get_query_status(sfqid) were instead get_query_status_throw_if_error(sfqid)? https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/cursor.py#L1641

Let me put together a complete script

sfc-gh-sghosh commented 3 months ago

Hello @Kache ,

Thanks for the update; we are working on it and will update you.

Regards, Sujan

Kache commented 3 months ago

Here is a cleaner repro than in the original post:

runtime_fail_sql = """
    SELECT *
    FROM (
        SELECT SEQ4()::variant AS col
        FROM TABLE(GENERATOR(ROWCOUNT => 1000000))  -- increase as needed
        UNION
        SELECT 'fail'::variant AS col  -- fails at runtime when execution reaches this row
    ) a
    JOIN (VALUES (1)) b(col) ON a.col::varchar::int = b.col
"""

def full_error():
    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=snowflake.connector.cursor.DictCursor) as cur:
            cur.execute(runtime_fail_sql)
            # snowflake.connector.errors.ProgrammingError: 100038 (22018): 01b6a58e-0308-12f7-76fd-87015479a46b: Numeric value 'fail' is not

            logging.info(cur.fetchall())

def async_naive_error():
    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=snowflake.connector.cursor.DictCursor) as cur:
            cur.execute_async(runtime_fail_sql)
            assert cur.sfqid
            cur.get_results_from_sfqid(cur.sfqid)

            results = cur.fetchall()
            # snowflake.connector.errors.DatabaseError: Status of query '01b6a58e-0308-12f9-76fd-8701547996af' is FAILED_WITH_ERROR, results are unavailable

            logging.info(results)

def async_full_error():
    retry_pattern = it.chain(ASYNC_RETRY_PATTERN, it.repeat(ASYNC_RETRY_PATTERN[-1]))

    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=snowflake.connector.cursor.DictCursor) as cur:
            cur.execute_async(runtime_fail_sql)
            assert cur.sfqid

            # custom wait loop, based on wait_until_ready() from cursor.get_results_from_sfqid()
            while True:
                status = cur.connection.get_query_status_throw_if_error(cur.sfqid)
                # snowflake.connector.errors.ProgrammingError: 100038: 100038: Numeric value 'fail' is not recognized

                if not cur.connection.is_still_running(status):
                    break

                time.sleep(0.5 * next(retry_pattern))

            cur.get_results_from_sfqid(cur.sfqid)
            logging.info(cur.fetchall())

In other words, it seems like this line should instead call get_query_status_throw_if_error(): https://github.com/snowflakedb/snowflake-connector-python/blob/050a1acea100ae045994c37c751fc71eaa364c03/src/snowflake/connector/cursor.py#L1641

sfc-gh-aling commented 2 months ago

hey @Kache , thanks for your feedback. I have a PR out to improve the error experience for async query: https://github.com/snowflakedb/snowflake-connector-python/pull/2035

sfc-gh-aling commented 2 months ago

hey @Kache , we have released v3.12.2 with the fix, could you try out the latest version?