long2ice / asynch

An asyncio ClickHouse Python Driver with native (TCP) interface support.
https://github.com/long2ice/asynch
Apache License 2.0
185 stars 43 forks source link

RuntimeError: read() called while another coroutine is already waiting for incoming data #47

Closed remort closed 2 years ago

remort commented 2 years ago

Hi!

I get the following error:

Traceback (most recent call last):
  File "ch_test_asynch.py", line 55, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "ch_test_asynch.py", line 52, in main
    await asyncio.gather(*inserts)
  File "ch_test_asynch.py", line 36, in insert_data
    await cursor.execute(
  File "/usr/local/lib/python3.8/dist-packages/asynch/cursors.py", line 61, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 526, in execute
    await self.force_connect()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 573, in force_connect
    await self.connect()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 470, in connect
    return await self._init_connection(host, port)
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 457, in _init_connection
    await self.receive_hello()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 201, in receive_hello
    server_name = await self.reader.read_str()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/io.py", line 138, in read_str
    length = await self.read_varint()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/io.py", line 124, in read_varint
    await self._read_into_buffer()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/io.py", line 112, in _read_into_buffer
    packet = await self.reader.read(self.buffer_max_size)
  File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

With this minimal code example:

import asyncio

import string

from asynch import connect
from asynch.cursors import DictCursor

async def connect_database():
    conn = await connect(
        host = "0.0.0.0",
        port = 9000,
        database = "waf",
    )
    return conn

async def create_table(client):
    async with client.cursor(cursor=DictCursor) as cursor:
        await cursor.execute('DROP TABLE IF EXISTS test')
        await cursor.execute("""
        CREATE TABLE if not exists waf.test
            (
                `id`       Int32,
                `decimal`  Decimal(10, 2),
                `string`   String

            )
            ENGINE = MergeTree
                ORDER BY id"""
        )

async def insert_data(client):
    async with client.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            """INSERT INTO waf.test(id,decimal,string) VALUES""",
            [
                (
                    1,
                    123.45,
                    'a str',
                ),
            ],
        )

async def main():
    client = await connect_database()
    await create_table(client)
    inserts = [insert_data(client) for f in range(0, 2)]
    await asyncio.gather(*inserts)

if __name__ == '__main__':
    asyncio.run(main())

ClickHouse client version 21.6.6.51 (official build). asynch-0.1.9

remort commented 2 years ago

Same approach with gather works with aiochclient but they use http connections there.

YogiLiu commented 2 years ago

Hi, @remort! Besause asynch comply with PEP249, you can get specifications from the docs .

Cursors created from the same connection are not isolated, i.e., any changes done to the database by a cursor are immediately visible by the other cursors. Cursors created from different connections can or can not be isolated, depending on how the transaction support is implemented (see also the connection’s .rollback() and .commit() methods).

Connection pool may be suitable for you.

remort commented 2 years ago

Checked it with a real app - it works as expected now. Thank you a lot. You can close the issue.

long2ice commented 2 years ago

Glad to hear that is resolved