MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.99k stars 404 forks source link

[ERROR] pd.read_sql_query with param #971

Closed rumbarum closed 2 years ago

rumbarum commented 2 years ago

I am trying get db data directly using pd.read_sql with asyncpg.

Below without params works,

import pandas as pd
from sqlalchemy.types import  Text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncConnection,  create_async_engine

async_engine = create_async_engine(config.WRITER_DB_URL, pool_recycle=3600)

# some trick to run sync block as async
async def get_df_from_sql(stmt: Text, engine: AsyncEngine) -> pd.DataFrame:
    def _read_sql(con: AsyncConnection, stmt):
        return pd.read_sql_query(stmt, con)

    async with engine.connect() as conn:
        data = await conn.run_sync(_read_sql, stmt)
    return data

query = "SOME QUERY"

data_df = await get_df_from_sql(text(query))

After I checked above, I tried to add params to query to prevent SQL Injection like below

...

async_engine = create_async_engine(config.WRITER_DB_URL, pool_recycle=3600)

# some trick to run sync block as async with params
async def get_df_from_sql(stmt: Text, params: dict, engine: AsyncEngine) -> pd.DataFrame:
    def _read_sql(con: AsyncConnection, stmt, params ):
        return pd.read_sql(stmt, con, params=params)
    async with engine.connect() as conn:
        data = await conn.run_sync(_read_sql, stmt, params)
    return data

query = "SOME QUERY with param"
param = {"key":"val" ... } 

data_df = await get_df_from_sql(text(query), params, async_engine)

And I got below result. pd.read_sql raise same error.

  File "/Users/rumbarum/ast/fastapi-boilerplate/app/chart/service.py", line 85, in get_event_count
    data_df = await get_df_from_sql(text(anormal_table_query), {"anormal_table":anormal_table, "table":table, "start_time":str(start_time),"end_time": str(end_time)}, connector)
  File "/Users/rumbarum/ast/fastapi-boilerplate/core/helpers/pandas.py", line 11, in get_df_from_sql
    data = await conn.run_sync(_read_sql, stmt, params)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/engine.py", line 548, in run_sync
    return await greenlet_spawn(fn, conn, *arg, **kw)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/Users/rumbarum/ast/fastapi-boilerplate/core/helpers/pandas.py", line 8, in _read_sql
    return pd.read_sql(stmt, con, params=params)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/pandas/io/sql.py", line 590, in read_sql
    return pandas_sql.read_query(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/pandas/io/sql.py", line 1560, in read_query
    result = self.execute(*args)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/pandas/io/sql.py", line 1405, in execute
    return self.connectable.execution_options().execute(*args, **kwargs)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/future/engine.py", line 280, in execute
    return self._execute_20(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
    self._adapt_connection.await_(
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 454, in _prepare_and_execute
    self._handle_exception(error)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 389, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/Users/rumbarum/ast/fastapi-boilerplate/.venv/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.InFailedSQLTransactionError'>: current transaction is aborted, commands ignored until end of transaction block
[SQL: 
    SELECT
        A.event_id AS anormal_event_id,
        B.*
    FROM (
        SELECT
            event_id
        FROM
            log_data."%s") AS A
        RIGHT JOIN (
            SELECT
                event_id,
                event_name,
                aws_service,
                event_time
            FROM
                log_data."%s"
            WHERE
                event_time >= '%s'
                AND event_time <= '%s') AS B ON A.event_id = B.event_id;
    ]
[parameters: ('anomal_detection_fake_data', 'cloudtrail_fake_data', '2022-10-18T17:00:00+00:00', '2022-10-19T17:00:00+00:00')]
(Background on this error at: https://sqlalche.me/e/14/dbapi)
rumbarum commented 2 years ago

Related https://github.com/MagicStack/asyncpg/issues/605#issuecomment-683192217