long2ice / asyncmy

A fast asyncio MySQL/MariaDB driver with replication protocol support
https://github.com/long2ice/asyncmy
Apache License 2.0
260 stars 32 forks source link

The user actively cancels the request, the task is canceled by CancelledError(), and the connection object or cursor recovery fails, causing the new SQL query to get the SQL result of the previous request. #103

Open bs-101 opened 1 month ago

bs-101 commented 1 month ago

I use the sanic framework and tortoise-orm==0.19.1. It uses the asyncmy package by default. As a result, when the client cancels the request, the connection is disordered. The current request sql is the sql of the previous interface. Query the results, and then tortoise gets the data for mapping, and the result reports keyerror.

log:

Traceback (most recent call last):
File "handle_request", line 83, in handle_request
class Sanic(BaseSanic, metaclass=TouchUpMeta):
File "/app/handler/ad_operation.py", line 180, in get_ad_config
ad_config = await AdConfigService.get_ad_content_by_pkg_and_group_and_lang(pkg, group, lang, is_new_user)
File "/app/bll/ad_operation.py", line 73, in get_ad_content_by_pkg_and_group_and_lang
).values("ad_content")
File "/usr/local/lib/python3.7/site-packages/tortoise/queryset.py", line 1623, in _execute
row[col] = func(row[col])
KeyError: 'ad_content'

The source code of tortoise’s import package is as follows:

import asyncio
from functools import wraps
from typing import Any, Callable, List, Optional, SupportsInt, Tuple, TypeVar, Union

try:
import asyncmy as mysql
from asyncmy import errors
from asyncmy.charset import charset_by_name
except ImportError:
import aiomysql as mysql
from pymysql.charset import charset_by_name
from pymysql import err as errors
bs-101 commented 1 month ago

@long2ice

long2ice commented 1 month ago

Hello, please give me a minimum reproducible example.

bs-101 commented 1 month ago

Okay, let me now use the simplest example to reproduce the problem Initialize the mysql connection pool in the before_server_start method in the sanic framework

    app.pool = await asyncmy.create_pool(
        host=app.config["DB_HOST"],
        user=app.config["DB_USER"],
        password=app.config["DB_PASS"],
        db=app.config["DB_NAME"],
        port=3306,
        maxsize=10,
        autocommit=True,
        charset='utf8mb4'
    )

Here is my test interface

app.add_route(handler.test, '/test')
from sanic import response
async def test(request):
    async with request.app.pool.acquire() as conn:
        async with conn.cursor() as cursor:
            sql = "SELECT ip, pkg FROM t_status_im_lb"
            await cursor.execute(sql)
            data = await cursor.fetchall()
            data = [dict(zip([column[0] for column in cursor.description], row)) for row in data]
            return response.json(data)

Here is my test script:

import asyncio
import aiohttp

URL = "http://127.0.0.1:8000/test"

async def send_and_cancel(session, url):
    try:
        async with session.get(url) as response:
            a =  response.text
            print(response.status)
    except Exception as e:
        print(f'{e}')
        print("Request cancelled on client side.")

async def cancel_request(url):
    async with aiohttp.ClientSession() as session:
        try:
            await asyncio.create_task(send_and_cancel(session, url))
            # a = round(random.uniform(2, 3), 2)
            # await asyncio.sleep(a)
            # task.cancel() 
        except asyncio.CancelledError:
            print("Request cancelled!")

async def main():
    tasks = []
    for i in range(100): 
        task = asyncio.create_task(cancel_request(URL))
        tasks.append(task)

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == '__main__':
    asyncio.run(main())

In the above script file, the result can be obtained normally by initiating 100 asynchronous requests. When I change the cancel_request method to the following:

async def cancel_request(url):
    async with aiohttp.ClientSession() as session:
        try:
            task = asyncio.create_task(send_and_cancel(session, url))
            a = round(random.uniform(2, 3), 2)
            await asyncio.sleep(a)
            task.cancel() 
        except asyncio.CancelledError:
            print("Request cancelled!")

After another 100 requests were made, when asyncmy connected to mysql to obtain data, the client withdrew the request, causing the interface to report an error.The log is as follows: VISIT [2024-09-23 07:08:56,575][87093][127.0.0.1:37636] GET http://127.0.0.1:8000/test 503 666 Then I changed the request to obtain the results normally without withdrawing the request, and only requested once. The interface still reported an error. The error message is as follows:

ERROR [2024-09-23 07:10:39,038][87093][sanic.error] Exception occurred while handling uri: 'http://127.0.0.1:8000/test'
Traceback (most recent call last):
  File "handle_request", line 83, in handle_request
    class Sanic(BaseSanic, metaclass=TouchUpMeta):
  File "/app/handler/__init__.py", line 54, in test
    await cursor.execute(sql)
  File "asyncmy/cursors.pyx", line 179, in execute
  File "asyncmy/cursors.pyx", line 364, in _query
  File "asyncmy/connection.pyx", line 494, in query
  File "asyncmy/connection.pyx", line 682, in _read_query_result
  File "asyncmy/connection.pyx", line 1069, in read
  File "asyncmy/connection.pyx", line 627, in read_packet
asyncmy.errors.InternalError: Packet sequence number wrong - got 99 expected 1

The above is my own test script. I don’t know if I need to call conn.rollback(), but even if I call rollback(), an error will be reported. The interface of the formal production environment has two SQL statements to query sql1 and sql2. However, when the client withdraws the operation, a new interface request comes. The query result of sql1 is for sql2, and the query result of sql2 is for sql1. I hope you can help me. Thank you. @long2ice