Open dikshant opened 1 year ago
They mentioned they are using Pandas which is using sqlalchemy underneath, I do wonder if Pandas is making the right call underneath
If they are using vanilla pandas .to_sql()
then it is most likely using plain old DBAPI .executemany()
. If something special is required for pandas then they might need to use a "callable with signature" .to_sql()
method. One example of such a method for PostgreSQL is here:
https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
I took a look at the pandas github repo. The code checks for a SQLAlchemy connection and the starts using the normal statement functionality. The code keeps going down the normal SQL text() statement path. Hopefully this information is helpful.
sql.py
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Engine)):
raise TypeError("pandas.io.sql.execute requires a connection") # GH50185
with pandasSQL_builder(con, need_transaction=True) as pandas_sql:
return pandas_sql.execute(sql, params)
def pandas_SQLBuilder():
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)):
return SQLDatabase(con, schema, need_transaction)
class SQLDatabase(PandasSQL):
def __init__(
self, con, schema: str | None = None, need_transaction: bool = False
) -> None:
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.schema import MetaData
# self.exit_stack cleans up the Engine and Connection and commits the
# transaction if any of those objects was created below.
# Cleanup happens either in self.__exit__ or at the end of the iterator
# returned by read_sql when chunksize is not None.
self.exit_stack = ExitStack()
if isinstance(con, str):
con = create_engine(con)
self.exit_stack.callback(con.dispose)
if isinstance(con, Engine):
con = self.exit_stack.enter_context(con.connect())
if need_transaction and not con.in_transaction():
self.exit_stack.enter_context(con.begin())
self.con = con
self.meta = MetaData(schema=schema)
self.returns_generator = False
def __exit__(self, *args) -> None:
if not self.returns_generator:
self.exit_stack.close()
Further to my earlier comment, this is what I had in mind:
import pandas as pd
import sqlalchemy as sa
from sqlalchemy_cockroachdb.transaction import run_transaction
engine = sa.create_engine(
"cockroachdb+psycopg2://root@localhost:26257/defaultdb"
)
def crdb_insert_trans(table, eng, keys, data_iter):
# to_sql() "method=" adapted from the example at
# https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
def callback(conn):
tbl = sa.Table(
table.name,
sa.MetaData(),
schema=table.schema,
autoload_with=eng,
)
data = [dict(zip(keys, row)) for row in data_iter]
conn.execute(sa.insert(tbl), data)
run_transaction(eng, callback)
df = pd.DataFrame([(1, "foo"), (2, "bar")], columns=["id", "txt"])
df.to_sql(
"table1", engine, if_exists="append", index=False, method=crdb_insert_trans
)
@dikshant - Does my code sample above resolve the issue?
@gordthompson The code calls the pandas read_sql_query().
engine = SqlDatabase.create_engine(connection_string, timeout_seconds, app_name)
try:
assert engine, "SQL Alchemy Engine is NULL"
with engine.connect() as conn:
if chunk_size:
conn = conn.execution_options(stream_results=True)
df = None
for chunk_df in pd.read_sql_query(
get_qualified_gigs_prod_query,
params=params,
con=conn,
parse_dates=parse_dates,
chunksize=chunk_size,
):
if df is None:
df = chunk_df
else:
df = pd.concat([df, chunk_df])
@ddennerline3 - So is read_sql_query()
ultimately throwing a transaction-related exception? If so, can you provide a stack trace?
pd.read_sql_query() is only generating an error that is probably ignored somewhere when the exception is raised.
(psycopg2.errors.SerializationFailure) restart transaction: TransactionRetryWithProtoRefreshError: ReadWithinUncertaintyIntervalError: read at time 1681307575.550021729,0 encountered previous write with future timestamp 1681307575.677729181,0 within uncertainty interval `t <= (local=0,0, global=1681307575.800021729,0)`; observed timestamps: [{4 1681307575.771496910,0} {5 1681307578.720783940,0} {6 1681307575.771576082,0} {9 1681307575.550021729,0}]: \"sql txn\" meta={id=cdc6eb3f key=/Min
Try wrapping the .read_sql_query()
call in a function that invokes it via the run_transaction()
helper. A simplified example would be:
import pandas as pd
import sqlalchemy as sa
from sqlalchemy_cockroachdb.transaction import run_transaction
engine = sa.create_engine(
"cockroachdb+psycopg2://root@localhost:26257/defaultdb"
)
def read_sql_qry_trans(qry, engine_):
def callback(conn):
return pd.read_sql_query(qry, conn)
return run_transaction(engine_, callback)
sql = """\
SELECT 1 AS id, 'foo' AS txt
UNION ALL
SELECT 2 AS id, 'bar' AS txt
"""
df = read_sql_qry_trans(sql, engine)
print(df)
"""
id txt
0 1 foo
1 2 bar
"""
Is run_transaction() compatible with Postgres? We would like to maintain one codebase. If not compatible, I saw it's possible to for SQLA to check the engine instance type.
Is run_transaction() compatible with Postgres?
I just tried my sample code above with a postgresql+psycopg2://…
URL against a real PostgreSQL server and nobody complained.
If not compatible, I saw it's possible to for SQLA to check the engine instance type.
True. Your wrapper could always call run_transaction()
for the cockroachdb dialect and just invoke the callback (with read_sql_query()
) directly for postgresql.
I recently built a custom DB query tool and used SQLAlchemy; it started experiencing transaction errors. To work around the Transaction retry errors, I created this run_statement() function below. I saw the CRDB adapter code is following a similar pattern, but I don't believe it’s auto-retrying. FWIW, it took some effort to find the right combination of exceptions and error code to make this function work correctly. It works for both Postgres and CRDB.
def run_statement(db_conn: Session, stmt: TextClause, bind_params: dict, max_retries=3):
"""Run SQL statement and retry connection for SerializationErrors
Args:
db_conn: Active MainAPI DB connection
stmt: the SQLAlchemy text() statement to execute
bind_params: a dictionary containing all input values for SQL
max_retries: If non-Null, then set the maximum number of retries
Returns:
The SQLAlchemy ResultSet list of dictionaries
"""
retry_count = 0
while True:
try:
sql = db_conn.execute(stmt, bind_params)
return sql
except DBAPIError as e:
db_conn.rollback()
if (max_retries is None) or (
max_retries is not None and retry_count >= max_retries
):
raise
retry_count += 1
if isinstance(e.orig, psycopg2.OperationalError):
if e.orig.pgcode == psycopg2.errorcodes.SERIALIZATION_FAILURE:
sleep_ms = (2**retry_count) * 0.1 * (random.random() + 0.5)
time.sleep(sleep_ms)
continue
raise
@ddennerline3 - Thanks for sharing your code. Just to confirm: Did it solve the read_sql_query()
problem for you?
I saw the CRDB adapter code is following a similar pattern, but I don't believe it’s auto-retrying.
Did you try it? It's certainly set up to do that.
One subtle difference between your code and the code in transaction.py is that your code interprets max_retries=None
as "never retry", whereas the existing adapter code interprets max_retries=None
as "unlimited retries".
There are two issues I was trying to address:
On both items, I suspect if the transaction.py code was changed slightly, then raw SQLAlchemy statements and Pandas my operate correctly.
The run_statement() code above is required because the transaction.py code doesn't seem to retry.
except sqlalchemy.exc.DatabaseError as e: <<< I am not sure this exception is caught
if max_retries is not None and retry_count >= max_retries:
raise
retry_count += 1
if isinstance(e.orig, psycopg2.OperationalError):
if e.orig.pgcode == psycopg2.errorcodes.SERIALIZATION_FAILURE:
if max_backoff > 0:
retry_exponential_backoff(retry_count, max_backoff)
continue
I didn't try to 100% emulate the transaction.py interface; thus the difference in retry=None.
Oh, so you suspect that
except sqlalchemy.exc.DatabaseError as e:
in transaction.py might be too specific?
Also, are you running SQLAlchemy 2.0, or 1.4?
SQLAlchemy 2.0
I tried originally to use DatabaseError, but that exception wasn't caught. When I shifted to DBAPIError, the exception was caught and I could check the original error.
I did have benefit of running the code on stressed cluster in a debugger.
Okay, thanks. I have pushed an update to my branch. Before I submit a PR I would be grateful if you could
pip install git+https://github.com/gordthompson/sqlalchemy-cockroachdb@issue_207
and verify that it avoids the issue in your particular environment.
We have a customer who is experiencing issues with transaction retries not being handled in sqlalchemy v 1.4.x. More details to follow but we would like to add transaction retries where applicable.
cc: @gordthompson @rafiss