MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.94k stars 402 forks source link

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress when executing gino processes in parallel #576

Open shsimeonova opened 4 years ago

shsimeonova commented 4 years ago

Hello! Thank you for your attention in advance.

Im trying to run parallel tasks in batches with asyncio, gino and asyncpg.

I have the magic starting from this entry point:

 while batch:
            tasks = {asyncio.create_task(JobService.run_job(job, connection_per_job=True, endpoint_connection=endpoint_request.db_connection)): job for job in batch}
            result = await asyncio.gather(*tasks)
            # executed_jobs.append({'name': job_instance.name, 'uuid': job_instance.uuid, 'result': job_execution.response})

            start += batch_jobs_count
            batch = jobs[start:start + batch_jobs_count]

endpoint_connection=endpoint_request.db_connection is the endpoint connection started from the point of receiving the request in my application (I'm using an internal util library over asyncpg, quart and gino)

Below in JobService.run_job I have:

@classmethod
    async def run_job(cls, job: Job, connection_per_job=False, endpoint_connection=None) -> (Job, JobExecution):
        print(f'received {job.name}, time: {datetime.utcnow()}')
        print(f'job: {job.name}, connection: {endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired}')

        if connection_per_job and bool(endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired):
            async with db_adapter.get_db().acquire() as conn:
                print('in context')
                job_instance, job_execution = await cls._run_job_internal(job, conn=conn)
        else:
            job_instance, job_execution = await cls._run_job_internal(job)

        return job_instance, job_execution

I'm trying to optimize using db connections by first checking if the general endpoint connection is free and using it if so and if not acquiring a new one in a context manager. In order to be sure which connection is used, I'm passing a bind (connection) parameter to all of my methods related to db operations (gino usage) and they seem to fail on the second created connection in the context, i don't really understand also how a connection created explicitly can be used in another operation already (from what i understand from the error)

elprans commented 4 years ago

Can you show the full traceback of the exception, please?

shsimeonova commented 4 years ago

Yes, sorry :)

Traceback (most recent call last):
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\default_handler.py", line 54, in handle
    endpoint_response = await func(endpoint_request, endpoint_response)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\controllers\model_controllers\job\__init__.py", line 39, in run
    result = await asyncio.gather(*tasks)
  File "C:\Users\SimonaSimeonova\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 292, in __wakeup
    future.result()
  File "C:\Users\SimonaSimeonova\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 223, in __step
    result = coro.send(None)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job.py", line 64, in run_job
    job_instance, job_execution = await cls._run_job_internal(job)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job.py", line 78, in _run_job_internal
    job_execution = await JobExecutionService.create(job, bind=conn)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job_execution.py", line 18, in create
    result_instance = await JobExecution.create(**data, bind=bind)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 53, in _create_without_instance
    return await cls(**values)._create(bind=bind, timeout=timeout)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 57, in _create
    uuid = await self.__create_pre_operations(request_dict, validate=validate, bind=bind)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 73, in __create_pre_operations
    revision = await cls.revision_cls.create_revision(cls, values, operation_type, bind=bind)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\entity_revision.py", line 70, in create_revision
    await cls.create(bind=bind, **revision_data)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\crud.py", line 452, in _create_without_instance
    return await cls(**values)._create(bind=bind, timeout=timeout)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\crud.py", line 487, in _create
    row = await bind.first(q)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\engine.py", line 745, in first
    return await conn.first(clause, *multiparams, **params)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\engine.py", line 325, in first
    return await result.execute(one=True)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\dialects\base.py", line 211, in execute
    context.statement, context.timeout, args, 1 if one else 0
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\dialects\asyncpg.py", line 178, in async_execute
    with getattr(conn, "_stmt_exclusive_section"):
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\asyncpg\connection.py", line 1841, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
elprans commented 4 years ago

Well, the general rule is that you are not supposed to share a connection between tasks, because PostgreSQL does not support interleaving queries on a single connection. You are trying to get around the critical section check, but that would only work if all code below the check is synchronous, i.e. you are not returning control to the event loop. That's not true in your case, there's plenty of opportunity between your check and the actual database operation for some other task to enter the critical section.

Is there a particular reason why a pool does not work for you?

shsimeonova commented 4 years ago

Im actually using a pool:

async with db_adapter.get_db().acquire() as conn:
                print('in context')
                job_instance, job_execution = await cls._run_job_internal(job, conn=conn)

db_adapter.get_db() is a wrapper of the gino engine and using acquire means the connection is always taken from the pool https://github.com/python-gino/gino/blob/77e19f58e4295729937a46996c0630020b29b9a3/gino/engine.py#L619

shsimeonova commented 4 years ago

Okay, so i started a separate engine specifically for this part of the application and the error seems to go away only when there are records in my table. Seems like in an empty table the first row gets locked until it gets populated, but parallel tasks try to access it too. So i tried checking if theres records in the table and sleep for a second, but the error occurs again.

lsabi commented 4 years ago

@shsimeonova do you mind sharing a little more details on how you did it? I'm facing the very same issue with two SELECT queries

More specifically, I'm using the following code, which is raising the exception you have shown

import asyncio
async with db_pool.acquire() as conn:
            async with conn.transaction():
                res1 = conn.fetchval(
                    """SELECT * FROM ...... ;"""
                )
                res2 = conn.fetchval(
                    """SELECT * FROM ...... ;"""
                )
                await asyncio.gather(row, streak)

Though, I guess that the problem is the fact that I'm sharing the acquired connection ....

dony585 commented 3 years ago

@shsimeonova I also faced the same issue using gino(with asyncpg driver). The issue for me was acquiring db connection with reuse=True inside tasks.

To elaborate, the tasks which I ran using asyncio.gather had codeasync with pool.acquire(reuse=True) as conn which somehow caused a connection to be shared between tasks and hence the error.

Acquiring connections with reuse=False fixed it for me.

Also the the crud methods provided by gino, use GinoEngine which under the hood, always acquires connections with reuse=True. So if you have code like User.create(**kwargs) etc it will also fail because, internally it acquires a reusable connection.

For reference https://github.com/python-gino/gino/issues/313

waydegg commented 3 years ago

@dony585 Which version of asyncpg are you using? In the latest version (0.23.0) pool.acquire() takes no arguments.