aio-libs / aiomysql

aiomysql is a library for accessing a MySQL database from the asyncio
https://aiomysql.rtfd.io
MIT License
1.72k stars 254 forks source link

Automatically handle parallel queries with an async `Lock` #852

Open LiraNuna opened 1 year ago

LiraNuna commented 1 year ago

Is your feature request related to a problem?

When performing two "parallel" queries via asyncio.gather, an error is thrown: RuntimeError: readexactly() called while another coroutine is already waiting for incoming data

Describe the solution you'd like

I have implemented a simple yet effective way to remedy this issue however it's quite "gross", requiring monkey patching as I am unable to perform the required changes in a type safe manner:

class LockedCursor(Cursor):
    def get_lock(self) -> Lock:
        if not hasattr(self.connection, 'lock'):
            self.connection.lock = Lock()

        return self.connection.lock

    async def execute(self, query_string: str, args: Optional[Iterable[Any]] = None):
        async with self.get_lock():
            return await super().execute(query_string, args)

I recommend this feature to be standard on the base cursor, as it's often time impossible to avoid parallel queries happening at the same time.

Describe alternatives you've considered

Another option would involve the connection holding an instance of asyncio.Queue to orchestrate any parallel queries going out, however I personally believe this is overkill and is not as simple to understand as a lock.

Another option I considered is subclassing each of the Pool, Connection and Cursor class to inject LockedCursor when calling connection.cursor, however that has proven to be very difficult and full of boilerplate and repetitive code to override the constructor and other methods simply to add a lock.

Additional context

I am willing to help prepare a PR if this is a solution that the maintainers would approve of. I am also willing to discuss other potential solutions and approaches.

Code of Conduct

Nothing4You commented 1 year ago

Hi, the simplest solution for this that would require no code changes is to just acquire() a connection from the pool for each task. By providing appropriate minsize and maxsize values the pool will automatically take care of creating the necessary amount of connections and it will not return on acquire() until a connection is available.

LiraNuna commented 1 year ago

I don't think this is solution will work if need to work within the same connection transaction, which is my requirement. On our server, we use a connection per request where each connection is also a transaction we can rollback if the request fails (due to an exception or other error).

Nothing4You commented 1 year ago

Indeed, transactions will get lost with that.

LiraNuna commented 1 month ago

Any progress on this? An asyncio.Lock is an elegant solution that is easy to implement. I'm willing to contribute a PR if necessary, I just want to make sure it will get quickly reviewed if I spend the time on this.