encode / databases

Async database support for Python. 🗄
https://www.encode.io/databases/
BSD 3-Clause "New" or "Revised" License
3.82k stars 262 forks source link

MySQL - asyncio.Lock get error #118

Open watsy0007 opened 5 years ago

watsy0007 commented 5 years ago

when use databases and aiomysql I get Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:370> cb=[set.discard()]> got Future <Future pending> attached to a different loop

detail:

Traceback (most recent call last):
  File "/app/services/currency.py", line 120, in update_currency_ticker_by_id
    return await database.execute(q)
  File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 122, in execute
    return await connection.execute(query, values)
  File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 209, in execute
    async with self._query_lock:
  File "/usr/local/lib/python3.7/asyncio/locks.py", line 92, in __aenter__
    await self.acquire()
  File "/usr/local/lib/python3.7/asyncio/locks.py", line 192, in acquire
    await fut
RuntimeError: Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:370> cb=[set.discard()]> got Future <Future pending> attached to a different loop

packages:

aiomysql==0.0.20
databases==0.2.3
alembic==1.0.11
fastapi==0.31.0

db.py

from databases import Database
database = Database(DB_URL, min_size=5, max_size=20)

execute code snippet


async def update_currency_ticker_by_id(currency_id: int, ticker: CurrencyTickerUpdateRequest):
    tbl_c = currency
    values = {k: v for k, v in ticker.dict().items() if v is not None}
    if values:
        q = tbl_c.update().where(tbl_c.c.id == currency_id).values(values)
        try:
            return await database.execute(q)
        except Exception as e:
            print('update error', e, "detail\n", traceback.format_exc())

run in docker with base image: python 3.7.3-alpine.

why did this happend? and how to fix it ... it's urgent.

lexee commented 5 years ago

got this error 2. is there any update here?

tomchristie commented 5 years ago

Can either of you reduce this to a really simple, reproducible example case? @lexee Are you also seeing this using FastAPI same as @watsy0007, or not?

watsy0007 commented 5 years ago
from fastapi import FastAPI
from databases import Database
from os import environ as env

database = Database(env.get('DB_URL', 'mysql://root:@127.0.0.1:3306/db'), min_size=5, max_size=10)

app = FastAPI()

@database.transaction()
async def blocked():
    return {}

@app.get('/')
async def index():
    return await blocked()

@app.on_event('startup')
async def setup_db():
    await database.connect()

test.py

import threading
import requests

[threading.Thread(target=requests.get, args=["http://127.0.0.1:8000"]).start() for _ in range(5)]

when I comment @database.transaction(). it's works.

@lexee @tomchristie

tomchristie commented 5 years ago

Any reason the issue got closed?

ningzio commented 4 years ago

i have the same problem, do we have any solution?

tomchristie commented 4 years ago

@NingziSlay Can you confirm if this is specific to MySQL?

ningzio commented 4 years ago

@NingziSlay Can you confirm if this is specific to MySQL?

sorry about poor English, hope you could understand.. yes, i only use mysql to my project.

i use fastapi too, if request come one by one, it works fine. but it cannot work concurrently

here is main.py

from fastapi import FastAPI
from apps import apis
from libs.db_tools import session

app = FastAPI()

app.include_router(apis, prefix="/api")

@app.on_event("startup")
async def startup():
    await session.connect()

@app.on_event("shutdown")
async def shutdown():
    await session.disconnect()

if __name__ == '__main__':
    import uvicorn

    uvicorn.run(
        app,
        host='0.0.0.0',
        port=8080,
    )

libs.db_tools.py

session = databases.Database("mysql+pymysql://root:root@192.168.0.61/beat_dev?charset=utf8&use_unicode=1")

i write this

from libs.db_tools import session
...
@router.get("/test")
async def test():
    query = Track.select()
    return await session.fetch_all(query)

and use ab

ab -n 4 -c 2 http://127.0.0.1:8000/test

will raise exception

...
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/middleware/errors.py", line 178, in __call__
    raise exc from None
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/middleware/errors.py", line 156, in __call__
    await self.app(scope, receive, _send)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/exceptions.py", line 73, in __call__
    raise exc from None
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/exceptions.py", line 62, in __call__
    await self.app(scope, receive, sender)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 590, in __call__
    await route(scope, receive, send)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 208, in __call__
    await self.app(scope, receive, send)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/fastapi/routing.py", line 133, in app
    raw_response = await dependant.call(**values)
  File "/Users/ningzi/workspace/beats/apps/access.py", line 68, in test
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/databases/core.py", line 131, in fetch_all
    return await connection.fetch_all(query, values)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/databases/core.py", line 217, in fetch_all
    async with self._query_lock:
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 92, in __aenter__
    await self.acquire()
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 192, in acquire
    await fut
RuntimeError: Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:385> cb=[set.discard()]> got Future <Future pending> attached to a different loop
ariesdevil commented 4 years ago

Is there any progress on this issue?

ningzio commented 4 years ago

I change my code like this, and it's work.

class SessionMaker(object):
    _session = None

    @classmethod
    def get_session(cls):
        if not cls._session:
            cls._session = databases.Database(settings.DB_URL)
        return cls._session

on main.py

@app.on_event("startup")
async def startup():
    session = SessionMaker.get_session()
    await session.connect()

@app.on_event("shutdown")
async def shutdown():
    session = SessionMaker.get_session()
    await session.disconnect()