Krukov / cashews

Cache with async power
MIT License
371 stars 22 forks source link

Early cache with sqlalchemy & fastapi depend (sqlalchemy.exc.IllegalStateChangeError: Method 'close()' can't be called here; method '_connection_for_bind()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>) #213

Open Like6po opened 3 months ago

Like6po commented 3 months ago

i got error from this discussion https://github.com/sqlalchemy/sqlalchemy/discussions/9312, when i try use early cache. sqlalchemy.exc.IllegalStateChangeError: Method 'close()' can't be called here; method '_connection_for_bind()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>

what i do:

  1. Create any fastapi application that will fetch any data from the database using the fastapi dependency to open a session
  2. Use the @early decorator on the function that executes the query using this session
  3. After the early_ttl expires, but before reaching the ttl, try calling the api method, you will get an error

I took chunks of code from my application to show how roughly the problem can be reproduced, but it's still not working code that can be run. but I think the essence of the problem is clear

# 
# some depends.py
#

engine = create_async_engine(
    url=settings.POSTGRES_DSN,
    poolclass=NullPool,
    echo=True,
)

SessionFactory = async_sessionmaker(
    bind=engine,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False,
)

async def create_session():
    async with SessionFactory() as session:
        yield session

InjectSession: TypeAlias = Annotated[AsyncSession, Depends(create_session)]

#
# some repositories.py
#
class BaseRepository:
    def __init__(self, session: InjectSession):
        self._session = session

class UsersRepository(BaseRepository):
    async def get_by_uuid(self, uuid: UUID) -> UsersModel | None:
        return await self._session.scalar(
            select(UsersModel).where(UsersModel.uuid == uuid),
        )

 InjectUsersRepository: TypeAlias = Annotated[UsersRepository, Depends()]

#
# some services.py
#
class UsersService:
    def __init__(self, users_repository: InjectUsersRepository):
        self._users_repository = users_repository

    # early_ttl small for demonstrate error
    @cache(ttl="10m", key="users:uuid:{uuid}", early_ttl="1s")
    async def get_by_uuid(self, uuid: UUID) -> UserDTO:
        user = await self._users_repository.get_by_uuid(uuid=uuid)
        if not user:
            raise BaseNotFoundHTTPException()
        return UserDTO.model_validate(user)

InjectUsersService: TypeAlias = Annotated[UsersService, Depends()]

#
# some rourers.py
#

router = APIRouter(prefix="/users", tags=["users"])

@router.get("/{user_uuid}", response_model=UserResponseSchema)
async def get_current_user(user_uuid: UUID, users_service: InjectUsersService) -> UsersModel:
    user = await users_service.get_by_uuid(uuid=user_uuid)
    if not user:  
        raise BaseNotFoundHTTPException()
    return user    

# some main.py
from routers import router

app = FastAPI()
app.include_router(router)

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host=settings.HOST,
        port=settings.PORT,
        reload=settings.is_local(),
    )

I think reason for it bug is /cashews/decorators/cache/early.py line 99

image

now i will be use just cache, but early looks like good, i want to use this, but its not working for my case :c

Krukov commented 2 months ago

Hello @Like6po , Sorry for not paying attention for a long time.

So yes I understand the problem: session/connection live in a scope of request only.

Yes it's a bug and you detect it right, I thought about this ugly 'create_task' stuff. I'll fix it , thanks for reporting