Mause / duckdb_engine

SQLAlchemy driver for DuckDB
MIT License
358 stars 41 forks source link

Reusing engines from different threads #1110

Open dhirschfeld opened 1 month ago

dhirschfeld commented 1 month ago

My observation is that passing an engine connected to an in-memory duckdb database to a different thread doesn't work.

I'm wondering if that's expected or if it would be considered a bug / missing feature?

Example:

import anyio
import sqlalchemy as sa
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    Session,
    mapped_column,
)

class Base(DeclarativeBase):
    pass

seq = sa.Sequence("user_id")

class User(Base):
    __tablename__ = "Users"
    id: Mapped[int] = mapped_column(
        seq,
        server_default=seq.next_value(),
        primary_key=True,
    )
    name: Mapped[str] = mapped_column(sa.String(30))

engine = sa.create_engine("duckdb:///:memory:")
Base.metadata.create_all(bind=engine)

with Session(engine) as session:
    spongebob = User(name="spongebob")
    sandy = User(name="sandy")
    patrick = User(name="patrick")
    session.add_all([spongebob, sandy, patrick])
    session.commit()

def run_query(engine: sa.Engine):
    with engine.connect() as conn:
        return conn.execute(sa.text("select * from Users")).fetchall()

Running the run_query function works as expected:

>>> run_query(engine)
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

...but if I run it in a background thread I get a Catalog Error: Table with name Users does not exist! exception 😔

My assumption is that the engine loses it's connection to the in-memory database in the main thread and creates a new in-memory database where that table doesn't exist?

>>> await anyio.to_thread.run_sync(run_query, engine) ```python-traceback --------------------------------------------------------------------------- CatalogException Traceback (most recent call last) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters) 1966 if not evt_handled: -> 1967 self.dialect.do_execute( 1968 cursor, str_statement, effective_parameters, context 1969 ) 1971 if self._has_events or self.engine._has_events: File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context) 940 def do_execute(self, cursor, statement, parameters, context=None): --> 941 cursor.execute(statement, parameters) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context) 139 else: --> 140 self.__c.execute(statement, parameters) 141 except RuntimeError as e: CatalogException: Catalog Error: Table with name Users does not exist! Did you mean "sqlite_master"? LINE 1: select * from Users ^ The above exception was the direct cause of the following exception: ProgrammingError Traceback (most recent call last) Cell In[10], line 1 ----> 1 await anyio.to_thread.run_sync(run_query, engine) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/to_thread.py:56, in run_sync(func, abandon_on_cancel, cancellable, limiter, *args) 48 abandon_on_cancel = cancellable 49 warn( 50 "The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is " 51 "deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead", 52 DeprecationWarning, 53 stacklevel=2, 54 ) ---> 56 return await get_async_backend().run_sync_in_worker_thread( 57 func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter 58 ) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1060, in TrioBackend.run_sync_in_worker_thread(cls, func, args, abandon_on_cancel, limiter) 1057 return func(*args) 1059 token = TrioBackend.current_token() -> 1060 return await run_sync( 1061 wrapper, 1062 abandon_on_cancel=abandon_on_cancel, 1063 limiter=cast(trio.CapacityLimiter, limiter), 1064 ) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:437, in to_thread_run_sync(sync_fn, thread_name, abandon_on_cancel, limiter, *args) 433 msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = ( 434 await trio.lowlevel.wait_task_rescheduled(abort) 435 ) 436 if isinstance(msg_from_thread, outcome.Outcome): --> 437 return msg_from_thread.unwrap() 438 elif isinstance(msg_from_thread, Run): 439 await msg_from_thread.run() File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***) 211 captured_error = self.error 212 try: --> 213 raise captured_error 214 finally: 215 # We want to avoid creating a reference cycle here. Python does 216 # collect cycles just fine, so it wouldn't be the end of the world (...) 225 # methods frame, we avoid the 'captured_error' object's 226 # __traceback__ from indirectly referencing 'captured_error'. 227 del captured_error, self File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:363, in to_thread_run_sync..report_back_in_trio_thread_fn..do_release_then_return_result() 357 def do_release_then_return_result() -> RetT: 358 # release_on_behalf_of is an arbitrary user-defined method, so it 359 # might raise an error. If it does, we want that error to 360 # replace the regular return value, and if the regular return was 361 # already an exception then we want them to chain. 362 try: --> 363 return result.unwrap() 364 finally: 365 limiter.release_on_behalf_of(placeholder) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***) 211 captured_error = self.error 212 try: --> 213 raise captured_error 214 finally: 215 # We want to avoid creating a reference cycle here. Python does 216 # collect cycles just fine, so it wouldn't be the end of the world (...) 225 # methods frame, we avoid the 'captured_error' object's 226 # __traceback__ from indirectly referencing 'captured_error'. 227 del captured_error, self File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:382, in to_thread_run_sync..worker_fn() 380 PARENT_TASK_DATA.task_register = task_register 381 try: --> 382 ret = context.run(sync_fn, *args) 384 if inspect.iscoroutine(ret): 385 # Manually close coroutine to avoid RuntimeWarnings 386 ret.close() File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1057, in TrioBackend.run_sync_in_worker_thread..wrapper() 1055 def wrapper() -> T_Retval: 1056 with claim_worker_thread(TrioBackend, token): -> 1057 return func(*args) Cell In[8], line 3, in run_query(engine) 1 def run_query(engine: sa.Engine): 2 with engine.connect() as conn: ----> 3 return conn.execute(sa.text("select * from Users")).fetchall() File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1418, in Connection.execute(self, statement, parameters, execution_options) 1416 raise exc.ObjectNotExecutableError(statement) from err 1417 else: -> 1418 return meth( 1419 self, 1420 distilled_parameters, 1421 execution_options or NO_OPTIONS, 1422 ) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:515, in ClauseElement._execute_on_connection(self, connection, distilled_params, execution_options) 513 if TYPE_CHECKING: 514 assert isinstance(self, Executable) --> 515 return connection._execute_clauseelement( 516 self, distilled_params, execution_options 517 ) 518 else: 519 raise exc.ObjectNotExecutableError(self) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1640, in Connection._execute_clauseelement(self, elem, distilled_parameters, execution_options) 1628 compiled_cache: Optional[CompiledCacheType] = execution_options.get( 1629 "compiled_cache", self.engine._compiled_cache 1630 ) 1632 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 1633 dialect=dialect, 1634 compiled_cache=compiled_cache, (...) 1638 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 1639 ) -> 1640 ret = self._execute_context( 1641 dialect, 1642 dialect.execution_ctx_cls._init_compiled, 1643 compiled_sql, 1644 distilled_parameters, 1645 execution_options, 1646 compiled_sql, 1647 distilled_parameters, 1648 elem, 1649 extracted_params, 1650 cache_hit=cache_hit, 1651 ) 1652 if has_events: 1653 self.dispatch.after_execute( 1654 self, 1655 elem, (...) 1659 ret, 1660 ) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1846, in Connection._execute_context(self, dialect, constructor, statement, parameters, execution_options, *args, **kw) 1844 return self._exec_insertmany_context(dialect, context) 1845 else: -> 1846 return self._exec_single_context( 1847 dialect, context, statement, parameters 1848 ) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1986, in Connection._exec_single_context(self, dialect, context, statement, parameters) 1983 result = context._setup_result_proxy() 1985 except BaseException as e: -> 1986 self._handle_dbapi_exception( 1987 e, str_statement, effective_parameters, cursor, context 1988 ) 1990 return result File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2355, in Connection._handle_dbapi_exception(self, e, statement, parameters, cursor, context, is_sub_exec) 2353 elif should_wrap: 2354 assert sqlalchemy_exception is not None -> 2355 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 2356 else: 2357 assert exc_info[1] is not None File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters) 1965 break 1966 if not evt_handled: -> 1967 self.dialect.do_execute( 1968 cursor, str_statement, effective_parameters, context 1969 ) 1971 if self._has_events or self.engine._has_events: 1972 self.dispatch.after_cursor_execute( 1973 self, 1974 cursor, (...) 1978 context.executemany, 1979 ) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context) 940 def do_execute(self, cursor, statement, parameters, context=None): --> 941 cursor.execute(statement, parameters) File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context) 138 self.__c.execute(statement) 139 else: --> 140 self.__c.execute(statement, parameters) 141 except RuntimeError as e: 142 if e.args[0].startswith("Not implemented Error"): ProgrammingError: (duckdb.duckdb.CatalogException) Catalog Error: Table with name Users does not exist! Did you mean "sqlite_master"? LINE 1: select * from Users ^ [SQL: select * from Users] (Background on this error at: https://sqlalche.me/e/20/f405) ```
dhirschfeld commented 1 month ago

If I change the run_query function to instead accept a sa.Connection then the query works in a background thread:

>>> def run_query(conn: sa.Connection):
...     return conn.execute(sa.text("select * from Users")).fetchall()

>>> with engine.connect() as conn:
...     res = run_query(conn)

>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

>>> with engine.connect() as conn:
...     res = await anyio.to_thread.run_sync(run_query, conn)

>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]
dhirschfeld commented 1 month ago

It would be great if it were possible to pass an engine to s separate thread to use so you could use the same code irrespective of whether you were connected to a Postgres database in production or a duckdb in-memory database in CI.

dhirschfeld commented 1 month ago

Calling back into the main-thread from the worker thread seems to work, but then it only works from the worker-thread context, so, not ideal.

def run_query(engine: sa.Engine):
    with anyio.from_thread.run_sync(engine.connect) as conn:
        return conn.execute(sa.text("select * from Users")).fetchall()