reagento / dishka

Cute DI framework with agreeable API and everything you need
https://dishka.readthedocs.io
Apache License 2.0
260 stars 35 forks source link

DB connection always created with pooling with Dishka #173

Open theobouwman opened 2 weeks ago

theobouwman commented 2 weeks ago

We are using dishka for DI with SQLALchemy and postgres with connection pooling. As you can see in some tracing examples below, there is sometimes a db connect span with is correct. But we also have API calls where multiple db connect span are registered (see second screenshot). Are we doing something completely wrong?

Here our DI:

class DBProvider(Provider):
    @provide(scope=Scope.APP)
    def engine(self) -> AsyncEngine:
        engine = create_async_engine(
            get_config().ASYNC_DB_URL(),
            echo=get_config().QUERY_ECHO,
            echo_pool=get_config().ECHO_POOL,
            pool_pre_ping=True,
            pool_size=get_config().DB_POOL_SIZE,
            json_serializer=custom_json_serialiser
        )

        SQLAlchemyInstrumentor().instrument(
            engine=engine.sync_engine,
            tracer_provider=trace.get_tracer_provider(),
            enable_commenter=True,
            commenter_options={
                "db_driver": True,
                "db_framework": True,
                "opentelemetry_values": True,
            }
        )

        return engine

    @provide(scope=Scope.REQUEST)
    async def session(self, engine: AsyncEngine) -> AsyncIterable[AsyncSession]:
        async with AsyncSession(engine, expire_on_commit=False) as session:
            yield session

Screenshot of trace 1:

Screenshot 2024-06-18 at 17 36 17

Screenshot of trace 2:

Screenshot 2024-06-18 at 17 44 03

Here our user_repo code:


class UserRepository(BaseRepository[User]):

    _model = User

    async def get_by_email(self, email: str) -> User:
        return (await self.session.execute(
            select(self._model).
            filter_by(email=email).
            limit(1)
        )).scalar_one_or_none()

    async def get_all_by_uid(self, uids: List[str]) -> List[User]:
        return (await self.session.execute(
            select(self._model).
            filter(self._model.uid.in_(uids))
        )).scalars().all()

And here the user_service code:

class UserService:

    def __init__(self, user_repository: UserRepository) -> None:
        self._user_repository: UserRepository = user_repository

    async def get_user(self, user_uid: str) -> User:
        user = await self._user_repository.get_by_uid(user_uid)
        if user is None:
            raise EntityNotFoundException

        return user
Tishka17 commented 2 weeks ago

Since it is done in single trace, I guess it was processed in one handler. Sqlalchemy session releases connection to pool internally on the end of transaction. On the next request it is obtained again, but depending on what else happens in the service it can be reused on new connection is created.

We need to see pool size and if there we concurrent requests processed.

I am also a bit nervous about seeing 700ms delay in single span. What was there?

Tishka17 commented 1 week ago

I am not sure, it is related to dishka. Do you have more questions regarding this?

theobouwman commented 1 week ago

The 700ms delay is because multiple external apis are being called. I think it is because sessions get openend and closed multiple times in the /auth/token request. Like after get_by_email and get_all_by_uid the session is closed automatically, so when it needs a session after that it creates one. So i need to figure out how to prevent this and keep one session per request.