snowflakedb / snowflake-connector-python

Snowflake Connector for Python
https://pypi.python.org/pypi/snowflake-connector-python/
Apache License 2.0
579 stars 467 forks source link

SNOW-1643781: Async queries: unable to re-establish connection/keep connection alive #2038

Open Kache opened 3 weeks ago

Kache commented 3 weeks ago

Python version

Python 3.11.4 (main, Jan 10 2024, 15:34:31) [Clang 15.0.0 (clang-1500.1.0.2.5)]

Operating system and processor architecture

Linux-6.10.6-orbstack-00249-g92ad2848917c-x86_64-with-debian-10.13

Installed packages

snowflake-connector-python 3.2.0

What did you do?

I want to log the query id without needing to wait for the query to have completed.

Starting from sync usage:

    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=DictCursor) as cur:
            cur.execute(sql_str)

            # logs after (potentially long) query completion
            logging.info(f"Query id: {cur.sfqid}")
            yield from cur

Attempt: use async

            cur.execute_async(sql_str)
            # logs right away
            logging.info(f"Query id: {cur.sfqid}")
            cur.get_results_from_sfqid(cur.sfqid)
            yield from cur

However, the above only works for fast queries. We use ABORT_DETACHED_QUERY = TRUE because we want queries to passively stop if/when the tasks/processes driving the original query stop/terminate, so queries > 5 mins get canceled:

snowflake.connector.errors.ProgrammingError: 000604: 604: SQL execution canceled

Although docs says:

To perform asynchronous queries, you must ensure the ABORT_DETACHED_QUERY configuration parameter is FALSE (default value).

It also says:

For asynchronous queries ... If ABORT_DETACHED_QUERY is set to TRUE, Snowflake automatically aborts all in-progress asynchronous queries when a client connection is not re-established after five minutes.

Sounds like just need a way to either keep the connection alive or re-connect.

Attempt: use async, ABORT_DETACHED_QUERY, and active heartbeat queries to "keep alive"

    import itertools as it
    from snowflake.connector.cursor import ASYNC_RETRY_PATTERN, DictCursor

    query_id = None
    wait_310_sql = "SELECT SYSTEM$WAIT(310)"
    retry_pattern = it.chain(ASYNC_RETRY_PATTERN, it.repeat(ASYNC_RETRY_PATTERN[-1]))
    tstart = time.monotonic()
    t_heartbeat = tstart

    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=DictCursor) as cur:
            cur.execute('ALTER SESSION SET ABORT_DETACHED_QUERY = TRUE')
            cur.execute_async(wait_310_sql)
            query_id = cur.sfqid
            assert query_id
            logging.info(f"Query ID: {query_id}")

            # wait loop based on wait_until_ready() from cursor.get_results_from_sfqid()
            while True:
                status = cur.connection.get_query_status_throw_if_error(query_id)
                now = time.monotonic()
                logging.info(f"{now - tstart:>6.1f} Query status: {status}")
                if not cur.connection.is_still_running(status):
                    break

                if now - t_heartbeat > 60 * 2:  # every ~2 mins
                    cur.execute("SELECT 'heartbeat'")  # try to keep connection alive
                    t_heartbeat = time.monotonic()

                time.sleep(0.5 * next(retry_pattern))

            cur.get_results_from_sfqid(query_id)
            logging.info(cur.fetchall())

However, the query gets cancelled all the same in 5 minutes.

Using a "re-connect" strategy by using a new connection every couple minutes (rather than using the same connection) is similarly ineffective.

What did you expect to see?

Expected to be able to prevent an async query from being canceled even though ABORT_DETACHED_QUERY = TRUE by either actively keeping the connection alive or by actively re-connecting.

Keeping ABORT_DETACHED_QUERY = TRUE is desirable because we want queries to passively stop if/when the tasks/processes driving the original query stop/terminate.

Can you set logging to DEBUG and collect the logs?

n/a

sfc-gh-dszmolka commented 2 weeks ago

hi - thanks for submitting this issue and especially for the reproduction. taking a look .

sfc-gh-dszmolka commented 2 weeks ago

i can confirm the async query gets cancelled anyways after 5 minutes even though the additional 'heartbeat' queries happening every 2 minutes. Apparently, a single query will not be recognized as a heartbeat. We have a private function to send request to snowflake to keep the connection alive (this is happening when client_session_keep_alive is set) - will check how it can be utilized in this scenario to properly send requests to /session/heartbeat

edit: replaced the SELECT 'heartbeat' with conn.rest._heartbeat() which , confirmed from the logs, correctly reaches out to the heartbeat endpoint but still did not prevent the SQL from being canceled after 5 minutes. We need to investigate further.

sfc-gh-dszmolka commented 2 weeks ago

tested various methods to no avail, until my colleague @sfc-gh-hachouraria figured out that sending a request to /queries/<queryId>/results allows the session to continue past the 5 minute mark.

This is done by issuing cur.query_result(query_id) ; tested it with your repro program and indeed these calls to query_result, instead issuing the heartbeat calls, allowed the detached async query to live more than 5 minutes even with ABORT_DETACHED_QUERY=TRUE and run to completion

Of course there aren't really any query results in the query_result until the query actually finishes, so probably needs to be wrapped in some logic when used as an alternative 'heartbeat' to prevent detached async query from being aborted, but perhaps could help in your situation.

Can you please try it and let us know if it works for you ?

Kache commented 1 week ago

Switching out cur.execute("SELECT 'heartbeat'") for cur.query_result(query_id) alone didn't work for me. I observed:

In addition:

Kache commented 1 week ago

Since the endpoint takes 30-50 sec to respond, I'm thinking of running it in a thread with daemon=True to "fire and forget" the heartbeat, rather than be blocked for an unused response

Just a little worried about potential bad resource cleanup though