googleapis / python-bigquery-sqlalchemy

SQLAlchemy dialect for BigQuery
MIT License
426 stars 127 forks source link

BQ storage API is used inappropriately when executing query with `@compiles` #1070

Open gallodustin opened 4 months ago

gallodustin commented 4 months ago

My rough understanding is that connecting using sqlalchemy-bigquery creates two clients, one with and one without the BQ storage API enabled, and attempts to be "smart" about determining which queries should be executed using which client. There appears to be an issue with this logic when attempting to execute certain queries creating a view which use the @compiles decorator from SQLAlchemy. See below for a specific example.

Thanks in advance for your help!

Environment details

Steps to reproduce

See the code example below

  1. Create a connection to BQ using the DBAPI as usual, with the BQ storage API enabled.
  2. Attempt conn.execute(obj) where object uses sqlalchemy.ext.compiler.compiles to compile the executable query string.

Code example

import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import ClauseElement, Executable

class CreateObjectAs(Executable, ClauseElement):
    def __init__(self, name, query, type="TABLE"):
        self.name = name
        self.query = query
        self.type = type

@compiles(CreateObjectAs)
def _create_object_as(element, compiler, **kw):
    return "CREATE %s %s AS (%s)" % (
        element.type,
        element.name,
        element.query,
    )

create_engine_kwargs = {"type": "service_account", "project_id": "oauth-test-dev-303921", ...}
execution_options = {"yield_per": 10000, "stream_results": True}

engine = sa.create_engine(url="***", **create_engine_kwargs)
conn = engine.connect()
conn = conn.execution_options(**execution_options)

full_tbl_name = "'oauth-test-dev-303921.dcdb_9c9ead69_7e13_40c6_91be_33072c1c21ea.dcdb_bc8ea959_48c3_4ef8_9903_a4a9fc46e98a'"
query = "'SELECT * \nFROM `oauth-test-dev-303921.datachattestconnect3`.`bikeshare_stations`'"

query = CreateObjectAs(full_tbl_name, query, type="view")

# str(query) = 'CREATE view oauth-test-dev-303921.dcdb_9c9ead69_7e13_40c6_91be_33072c1c21ea.dcdb_bc8ea959_48c3_4ef8_9903_a4a9fc46e98a AS (SELECT * \nFROM `oauth-test-dev-303921.datachattestconnect3`.`bikeshare_stations`)'

try:
    conn.execute(query) # Option 1: this raises the exception "google.api_core.exceptions.InvalidArgument: 400 request failed: non-table entities cannot be read with the storage API"
    conn.execute(str(query)) # Option 2: this succeeds
finally:
    conn.close()
    engine.dispose()

Stack trace (when conn.execute(query) is called)

Traceback (most recent call last):
  File "/opt/venv/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 72, in error_remapped_callable
    return callable_(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/grpc/_channel.py", line 1161, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "request failed: non-table entities cannot be read with the storage API"
    debug_error_string = "UNKNOWN:Error received from peer ipv4:142.251.33.74:443 {created_time:"2024-04-30T17:15:02.03113309+00:00", grpc_status:3, grpc_message:"request failed: non-table entities cannot be read with the storage API"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1916, in _execute_context
    result = context._setup_result_proxy()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 1463, in _setup_result_proxy
    strategy = _cursor.BufferedRowCursorFetchStrategy(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py", line 1043, in __init__
    self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/_helpers.py", line 494, in with_closed_check
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py", line 370, in fetchmany
    self._try_fetch(size=size)
  File "/opt/venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py", line 266, in _try_fetch
    rows_iterable = self._bqstorage_fetch(bqstorage_client)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py", line 305, in _bqstorage_fetch
    read_session = bqstorage_client.create_read_session(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py", line 622, in create_read_session
    response = rpc(
               ^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 113, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/api_core/retry.py", line 349, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/api_core/retry.py", line 191, in retry_target
    return target()
           ^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 74, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 request failed: non-table entities cannot be read with the storage API