aio-libs / aiomysql

aiomysql is a library for accessing a MySQL database from the asyncio
https://aiomysql.rtfd.io
MIT License
1.75k stars 256 forks source link

The latest data cannot be queried when I use the pool #519

Open lastshusheng opened 4 years ago

lastshusheng commented 4 years ago

When I use the connection pool, I add or modify data to the database, and then get a connection from the connection pool to query. The resulting data is still old data. like this:

loop = asyncio.get_event_loop()
async def go():
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                           user='root', password='xxx',
                                           db='study', loop=loop)
    while True:
        await asyncio.sleep(2)
        async with pool.acquire() as conn:  # type: aiomysql.Connection
            async with conn.cursor() as cur:
                await cur.execute("SELECT nickname FROM tb_user ORDER BY id DESC LIMIT 1")
                # print(cur.description)
                (r,) = await cur.fetchone()
        print(r)
        print(pool.size)

loop.run_until_complete(go())

When I change the way I write it he can normally query the new data. like this:

loop = asyncio.get_event_loop()
async def go():
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                           user='root', password='xxx',
                                           db='study', loop=loop)
    while True:
        await asyncio.sleep(1)
        async with await pool.acquire() as conn:  # type: aiomysql.Connection
            async with conn.cursor() as cur:
                await cur.execute("SELECT nickname FROM tb_user ORDER BY id DESC LIMIT 1")
                # print(cur.description)
                (r,) = await cur.fetchone()
        pool.release(conn)
        print(r)
        print(pool.size)

loop.run_until_complete(go())

But then I got a warning:

An open stream object is being garbage collected; call "stream.close()" explicitly.

I don't know how to get real-time update data from the database in the connection pool normally.

suiyuex commented 4 years ago

@lastshusheng i guess this is a bug, i tested, and found a solution

whenever you get a connection instance you can call its select_db api, whether you provide db option at initiallization

the code below is an example

import asyncio
import time
import aiomysql

db_config = {
    'host': 'localhost',
    'port': 32769,
    'user': 'root',
    'password': 'password here',
    'db': 'db name'
}

query_timeout = 10

async def main():
    start_time = time.time()
    my_pool = await aiomysql.create_pool(**db_config)
    while True:
        if time.time() - start_time > query_timeout:
            break
        my_conn = await my_pool.acquire()
        # use select_db here, important can get the latest data from db
        await my_conn.select_db('tdb')
        my_cursor = await my_conn.cursor()

        await my_cursor.execute("select * from tb_users")
        fetch_result = await my_cursor.fetchall()
        print(fetch_result)
        await my_cursor.close()
        my_pool.release(my_conn)
        await asyncio.sleep(2)

    my_pool.close()
    await my_pool.wait_closed()

async def main2():
    start_time = time.time()
    my_pool = await aiomysql.create_pool(**db_config)
    while True:
        if time.time() - start_time > query_timeout:
            break
        async with my_pool.acquire() as my_conn:
            # use select_db here, important can get the latest data from db
            await my_conn.select_db('tdb')
            async with my_conn.cursor() as my_cursor:
                await my_cursor.execute("select * from tb_users")
                fetch_result = await my_cursor.fetchall()
                print(fetch_result)
        await asyncio.sleep(2)
    my_pool.close()
    await my_pool.wait_closed()

if __name__ == '__main__':
    asyncio.run(main2())

you can copy and paste it and test it, i hope can help you

lastshusheng commented 4 years ago

@suiyuex thanks for your anwser I tried with your anwser, but i still got a warning. The difference is that not every time there's a warning, a warning is given each time the loop executes the second time, my print likes this:

Co用户-cQ6m
1 

An open stream object is being garbage collected; call "stream.close()" explicitly.
Co用户-cQ6m
0 

Co用户-cQ6m
1 

An open stream object is being garbage collected; call "stream.close()" explicitly.
Co用户-cQ6m
0 

And my script likes this:

pool = await aiomysql.create_pool(host=host, port=port, user=user, password=password, loop=loop)
while True:
    await asyncio.sleep(1)
    async with pool.acquire() as conn:  # type: aiomysql.Connection
        await conn.select_db('study')
        async with conn.cursor() as cur:
            await cur.execute("SELECT nickname FROM tb_user ORDER BY id DESC LIMIT 1")
            (r,) = await cur.fetchone()
    print(r)
    print(pool.size, '\n')

This warning appears when I use python3.8, with python3.7 that not appear

suiyuex commented 4 years ago

@lastshusheng I ran the code you provided, But there is no warning message

my code like this

import asyncio
import time
import aiomysql

db_config = {
    'host': 'localhost',
    'port': 32769,
    'user': 'root',
    'password': '*',
}
query_timeout = 10

async def main2():
    start_time = time.time()
    my_pool = await aiomysql.create_pool(**db_config)
    while True:
        if time.time() - start_time > query_timeout:
            break
        async with my_pool.acquire() as my_conn:
            # use select_db here, important can get the latest data from db
            await my_conn.select_db('tdb')
            async with my_conn.cursor() as my_cursor:
                await my_cursor.execute("select name from tb_users order by age desc limit 10")
                r = await my_cursor.fetchone()
        print(r)
        print(my_pool.size, '\n')
        await asyncio.sleep(2)
    my_pool.close()
    await my_pool.wait_closed()

if __name__ == '__main__':
    asyncio.run(main2())

this is output image

my python version is 3.8.5 (tags/v3.8.5:580fbb0, Jul 20 2020, 15:57:54) [MSC v.1924 64 bit (AMD64)]

This puzzles me, and im not sure what caused it, maybe mysql version and im mysql version is

+-----------+
| version() |
+-----------+
| 8.0.21    |
+-----------+

😵😵😵

but, Can you get the latest data using that method provided above?

lastshusheng commented 4 years ago

@suiyuex I can get the latest data when i used select_db api. This warning probably only a bug in python3.8.0. I used aiomysql in my django project instead of the synchronized ORM, but when I saw this warning I was worried about a memory leak online, in your opinion, will this happen? Finally, Chinese?

suiyuex commented 4 years ago

@lastshusheng do not worry, This will not cause memory leak, According to the warning message,that stream has been recycled by the garbage collection mechanism,so that is destroyed!yes ,Im Chinese。

matthewswain commented 3 years ago

It looks like you aren't setting "autocommit": True in your db_config. I think this means your select statements will be opening implicit transactions. When a connection is reused, it has an open transaction from the previous query, so it doesn't see the new data.

lastshusheng commented 3 years ago

transaction

@matthewswain Thanks,i have tried it and it worked. I don't know transaction will affect the select statement. I was so stupid.

rs-blashyrkh commented 2 years ago

Setting the proper transaction isolation level in every brand new connection helped me. Actually, I wrapped connection into my class with __aenter__ and __aexit__, and perform the query inside my __aenter__. Something like this:

        async def __aenter__(self):
            self.context = self.conn_pool.acquire()
            self.conn = await self.context.__aenter__()
            async with self.conn.cursor() as cur:
                await cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;")
            return self

        async def __aexit__(self, *args):
            return await self.context.__aexit__(*args)