aio-libs / aiomysql

aiomysql is a library for accessing a MySQL database from the asyncio
https://aiomysql.rtfd.io
MIT License
1.75k stars 255 forks source link

Similarly, the query results of the sql statements aiomysql and navicat are inconsistent(同样sql语句aiomysql和navicat查询结果不一致) #973

Closed zwb0619 closed 9 months ago

zwb0619 commented 9 months ago

config/mysql.py # aiomysql program encapsulation

-- coding: utf-8 --

import asyncio import aiomysql

from config.globalParams import mysqlParams

_instance = None _used = set()

class Database: MAX_RECONNECT_ATTEMPTS = 3 # 最大重新连接次数

@staticmethod
def get_instance():
    return _instance

@staticmethod
def init_instance(instance):
    global _instance
    _instance = instance

def __init__(self):
    self.pool = None

async def connect(self):
    reconnect_attempts = 0  # 重新连接尝试次数
    while reconnect_attempts < self.MAX_RECONNECT_ATTEMPTS:
        try:
            self.pool = await aiomysql.create_pool(
                # **mysqlParams
                host='192.168.66.123',
                port=33060,
                user='root',
                password='123456',
                db='zhtz',
                charset='utf8mb4',
                autocommit=False,
                echo=True,
                cursorclass=aiomysql.DictCursor,
                minsize=20,
                maxsize=25
            )
            break  # 成功连接时跳出循环
        except Exception as e:
            # 处理连接失败的情况,例如记录日志
            print(f"连接数据库失败: {e}")
            reconnect_attempts += 1
    else:
        print("无法连接到数据库")

# 获取链接
async def get_connection(self):
    conn = await self.pool.acquire()
    _used.add(conn)
    return conn

# 释放链接
async def release(self, conn):
    _used.remove(conn)
    await self.pool.release(conn)

# 关闭链接
async def close(self):
    print(self.pool)
    self.pool.close()
    await self.pool.wait_closed()

# 提交
async def commit(self, conn):
    await conn.commit()
    await self.release(conn)

# 回滚
# @staticmethod
async def rollback(self, conn):
    await conn.rollback()
    await self.release(conn)

# 单个查询
async def query(self, sql, params=None):
    conn = await self.get_connection()
    try:
        async with conn.cursor() as cursor:
            await cursor.execute(sql, params)
            result = await cursor.fetchall()
            return result
    except Exception as err:
        print('数据库查询失败', err)
        print('错误操作语句:', cursor.mogrify(sql, params))
        return False
    finally:
        await self.release(conn)

# 多个查询
async def multi_query(self, queries):
    conn = await self.get_connection()
    try:
        # async with conn.cursor() as cursor:
        tasks = []
        for sql, params in queries:
            task = self.query(sql, params)
            tasks.append(task)
        results = await asyncio.gather(*tasks)
        if False in results:
            return False
        else:
            return results
    except Exception as err:
        print('数据库查询失败', err)
        # print("错误操作语句:", sql)
        return False
    finally:
        await self.release(conn)

async def modify(self, sql, params=None):
    conn = await self.get_connection()
    try:
        async with conn.cursor() as cursor:
            await cursor.execute(sql, params)
            await self.commit(conn)
            return True
    except Exception as e:
        print("数据库操作失败:", e)
        print('错误操作语句:', cursor.mogrify(sql, params))
        await self.rollback(conn)
        return False

# 事务操作
async def multi_modify(self, modifies):
    conn = await self.get_connection()
    try:
        await conn.begin()
        async with conn.cursor() as cursor:
            for sql, params in modifies:
                await cursor.execute(sql, params)
        await self.commit(conn)
        return True
    except Exception as err:
        print('数据库操作失败.....', err)
        print("错误操作语句:", sql, params)
        await self.rollback(conn)
        return False

resources.py

-- coding: utf-8 --

import asyncio import json from func import treeProcess, getSnowFlakeId from fastapi import APIRouter, Request from config.mysql import Database

router = APIRouter() db = Database.get_instance()

async def get_mysql_connection(): global db db = Database.get_instance() return db

使用aiomysql

获取所有资源菜单

@router.post("/api/v1/system/resources/getResourcesList/") async def getResourcesList(request: Request): sql = "SELECT a.res_id as id,a.parent_id,a.res_name,a.res_title,a.res_path,a.sort,a.res_type,a.icon,a.res_desc, \ b.static_data_value_name as res_type_name \ from base_resources_info a left JOIN (select static_data_value_id,static_data_value_name \ from base_public_static_data_value where static_data_id = (select static_data_id from base_public_static_data \ where is_delete = %s and static_data_name = %s)) as b on b.static_data_value_id = a.res_type \ where a.res_type in %s order by a.parent_id " params = (0, "资源类型", (1, 2)) if db is None: await get_mysql_connection() data = await db.query(sql, params)

print('data:', data)

if data is False:
    return {"errCode": 1, "errMsg": "系统故障,请联系管理员或稍后再试"}
elif data == ():
    return {"errCode": 2, "errMsg": "无菜单资源数据"}
else:
    result = await treeProcess.menu_to_tree(data, "0")
    # print('result:', result)
    if result is False:
        return {"errCode": 1, "errMsg": "系统故障,请联系管理员或稍后再试"}
    else:
        top_parent = {
            "id": "0",
            "parent_id": "",
            "res_name": "top",
            "res_title": "无上级",
            "parent_name": "",
            "res_path": "/",
            "sort": "0",
            "res_type": "1",
            "icon": "",
            "res_type_name": "菜单",
            "des_desc": "",
            "children": result,
            "client_id": "",
        }
        res = [top_parent]
        return {"errCode": 0, "errMsg": "ok", "datalist": res}

删除资源

@router.post("/api/v1/system/resources/delResource/") async def delResource(request: Request): body = await request.body() receive = json.loads(body) res_id = receive["res_id"] sql = "select count(*) as num from base_resources_info where parent_id = %s" params = (f"{res_id}",) if db is None: await get_mysql_connection() result = await db.query(sql, params) if result is False: return {"errCode": 1, "errMsg": "系统故障,请联系管理员或稍后再试"} elif result[0]["num"] > 0: return {"errCode": 2, "errMsg": "要删除的资源菜单有下级资源菜单,不允许删除,要删除请先删除下级资源菜单"} else: sql2 = "delete from base_resources_info where res_id = %s" result2 = await db.modify(sql2, params) if result2 is False: return {"errCode": 1, "errMsg": "系统故障,请联系管理员或稍后再试"} else: sql3 = "delete from base_role_resources where res_id = %s" await db.modify(sql3, params)
return {"errCode": 0, "errMsg": "ok"}

async def main(): await asyncio.sleep(10) await get_mysql_connection()

if name == "main": loop = asyncio.get_event_loop() try: loop.create_task(main()) finally: loop.close()

func/treeProcess.py

-- coding: utf-8 --

import asyncio from config.mysql import Database

db = Database.get_instance()

async def get_mysql_connection(): global db db = Database.get_instance() return db

async def menu_to_tree(data, parent_id): tree = [] if db is None: await get_mysql_connection() for item in data: if item["parent_id"] == parent_id: res_id = item["id"] sql = "select res_title as parent_name from base_resources_info where res_id = %s" params = (f"{res_id}",) parent = await db.query(sql, params) print(parent) if parent is False: return False elif parent == (): item["parent_name"] = "无上级" else: item["parent_name"] = parent[0]["parent_name"] next_pid = item["id"] item["children"] = await menu_to_tree(data, next_pid) num = len(item["children"]) if num == 0: del item["children"] tree.append(item) return tree

async def main(): await asyncio.sleep(10) await get_mysql_connection()

if name == "main": loop = asyncio.get_event_loop() try: loop.create_task(main()) finally: loop.close()

web端调用delResource,日志看到有执行下面语句,对应数据库中res_title为组织权限 delete from base_resources_info where res_id = '5e283397-be88-11ed-97cb-0242ac110002' delete from base_role_resources where res_id = '5e283397-be88-11ed-97cb-0242ac110002'

delResource运行成功,没有任何报错,return {"errCode": 0, "errMsg": "ok"}后,web端调用getResourcesList,日志: SELECT a.res_id as id,a.parent_id,a.res_name,a.res_title,a.res_path,a.sort,a.res_type,a.icon,a.res_desc, b.static_data_value_name as res_type_name from base_resources_info a left JOIN (select static_data_value_id,static_data_value_name from base_public_static__value where static_data_id = (select static_data_id from base_public_static_data where is_delete = 0 and static_data_name = '资源类型')) as b on b.static_data_value_id = a.res_type where a.res_type in (1,2) order by a.parent_id

并且在执行menu_to_tree有sql日志: select res_title as parent_name from base_resources_info where res_id = '5e283397-be88-11ed-97cb-0242ac110002' [{'parent_name': '组织权限'}]

在执行delResource时,已经将res_title为组织权限删除,为什么getResourcesList和menu_to_tree仍然能查询到这条记录? 用navicat客户端执行日志中打印的sql语句,结果中并无res_title为组织权限的这条记录。

在执行完delResource后,顺序调用getResourcesList,有时候结果正确和mysql的记录一致,有时候会多出res_title为组织权限的这条数据,并无固定规律。

python:3.9.6 aiomysql:0.2.0 PyMySQL:1.1.0 fastapi:0.105.0

show variables like '%query_cache%' result:No

排查这个问题三天了。整个程序运行没有任何报错,为什么会出现这种已经删除的数据,仍然能查询到的情况?

zwb0619 commented 9 months ago

刚刚无意中发现问题了 autocommit设为True,就没问题了。 设置为Flase,aiomysql的查询就有问题。

我代码中是设为Flase,手动执行commit,aiomysql也执行了commit操作,因为mysql数据变了。这是bug?