long2ice / asynch

An asyncio ClickHouse Python Driver with native (TCP) interface support.
https://github.com/long2ice/asynch
Apache License 2.0
185 stars 43 forks source link

AsynchPoolError: no free connection #121

Open itssimon opened 1 month ago

itssimon commented 1 month ago

Based on the deprecation warnings in 0.2.5 I changed my application code from:

async with pool.acquire() as conn:

to:

async with pool.connection() as conn:

After deploying this change my application failed with lots of errors like this:

AsynchPoolError('no free connection in the <Pool(minsize=1, maxsize=10) object at 0x7f5ed1859190; status: PoolStatus.opened>')

Looks like previously pool.acquire() would wait for a free connection, while pool.connection() simply throws an exception. Is this expected behaviour?

itssimon commented 1 month ago

@stankudrow mind taking a look at this? Great work on refactoring the API btw

stankudrow commented 1 month ago

@itssimon , yeah, could you provide a simple reproducible example?

Mine is as follows:

import asyncio

import asynch

async def test_select(conn_ctx, selectee: int) -> None:
    async with conn_ctx as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(f"SELECT {selectee}")
            ret = await cursor.fetchone()
            assert ret == (selectee,)
            return ret

async def main():
    async with asynch.Pool(minsize=1, maxsize=10) as pool:
        tasks: list[asyncio.Task] = []
        for idx in range(pool.maxsize):
            conn_ctx = pool.connection()
            tasks.append(asyncio.create_task(test_select(conn_ctx, idx + 1)))

        result = await asyncio.gather(*tasks)
        print(f"Totals: {result}")

if __name__ == "__main__":
    asyncio.run(main())
stankudrow commented 1 month ago

Ah, there is a problem if you try to acquire more connections than your pool can give you:

import asyncio

import asynch

async def test_select(conn_ctx, selectee: int) -> None:
    async with conn_ctx as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(f"SELECT {selectee}")
            ret = await cursor.fetchone()
            assert ret == (selectee,)
            return ret

async def main():
    async with asynch.Pool(minsize=1, maxsize=10) as pool:
        tasks: list[asyncio.Task] = []
        for idx in range(pool.maxsize + 1):  # HERE THE MAXSIZE GETS EXCEEDED
            conn_ctx = pool.connection()
            tasks.append(asyncio.create_task(test_select(conn_ctx, idx + 1)))

        result = await asyncio.gather(*tasks)
        print(f"Totals: {result}")

if __name__ == "__main__":
    asyncio.run(main())
itssimon commented 1 month ago

Yes I think that's exactly what's happening in my application

stankudrow commented 1 month ago

@itssimon , do you think it was a bad idea to throw the exception when exceeding the number of connection request from the pool? I think it is legitimate to raise an exception when a pool can no longer provide connections.

Could your share your opinion on this point?

itssimon commented 1 month ago

IMO, the previous implementation was preferable and what I'd expect from a connection pool. When trying to acquire a connection from a pool that doesn't have any free connections, it should wait for a connection to become available (with a configurable timeout). In most cases it will just be a matter of milliseconds.

Other connection pool implementations I'm aware of that use this approach are SQLAlchemy and pgBouncer.

SQLAlchemy also allows users to configure an overflow limit, allowing the pool to temporarily create more connections if needed.

stankudrow commented 1 month ago

IMO, the previous implementation was preferable and what I'd expect from a connection pool. When trying to acquire a connection from a pool that doesn't have any free connections, it should wait for a connection to become available (with a configurable timeout). In most cases it will just be a matter of milliseconds.

Other connection pool implementations I'm aware of that use this approach are SQLAlchemy and pgBouncer.

SQLAlchemy also allows users to configure an overflow limit, allowing the pool to temporarily create more connections if needed.

Aight, I'll check other implementations a bit later. If you already have a solution, your PR is welcome, please feel free to contribute.

itssimon commented 1 month ago

The implementation of pool.acquire() should be a good starting point. I'd create a PR but I'm on mobile only for a few days.

stankudrow commented 1 month ago

@itssimon , please have a look at #124 .

stankudrow commented 3 weeks ago

@itssimon , hello. The PR #124 got merged, wouldn't you mind to close this issue?

stankudrow commented 2 weeks ago

@itssimon

itssimon commented 2 weeks ago

@stankudrow, thanks for your work on this repository. However, pestering people with mentions in short succession is not appreciated. Please be patient, people will get to it in their own time.

I have now blocked notifications from you.