long2ice / asynch

An asyncio ClickHouse Python Driver with native (TCP) interface support.
https://github.com/long2ice/asynch
Apache License 2.0
186 stars 43 forks source link

Error when toggle stream_results=True #24

Open bralbral opened 3 years ago

bralbral commented 3 years ago

How can i get data by chunks?

Code:

    conn = await connect(
        host=settings.CLICKHOUSE_BACKEND_HOST,
        port=settings.CLICKHOUSE_BACKEND_TCP_PORT,
        user=settings.CLICKHOUSE_BACKEND_USER,
        password=settings.CLICKHOUSE_BACKEND_PASSWORD,
        database="datasets",
        send_receive_timeout=1000,
    )

    async with conn.cursor(cursor=DictCursor) as cursor:
        cursor.set_stream_results(stream_results=True, max_row_buffer=1000)
        await cursor.execute(
            """
                SELECT *
                FROM ontime
        """
        )
        for row in cursor.fetchall():
            print(row)

Traceback:


  File "C:\Users\vasya\PycharmProjects\mielpops\app\hooks.py", line 21, in clickhouse_exec
    await cursor.execute(
          │      └ <function Cursor.execute at 0x000002A45AFB2040>
          └ <asynch.cursors.Cursor object at 0x000002A45BC59790>

  File "C:\Users\vasya\PycharmProjects\mielpops\venv\lib\site-packages\asynch\cursors.py", line 59, in execute
    execute, execute_kwargs = self._prepare()
                              │    └ <function Cursor._prepare at 0x000002A45AFB2430>
                              └ <asynch.cursors.Cursor object at 0x000002A45BC59790>

  File "C:\Users\vasya\PycharmProjects\mielpops\venv\lib\site-packages\asynch\cursors.py", line 181, in _prepare
    self.settings = self.settings or {}
    │               └ <asynch.cursors.Cursor object at 0x000002A45BC59790>
    └ <asynch.cursors.Cursor object at 0x000002A45BC59790>

AttributeError: 'Cursor' object has no attribute 'settings'
bukata-sa commented 2 years ago

There are more problems with stream results than that I faced problem here self.settings = self.settings or {} Solved it that way:

collections_cursor.set_stream_results(True, 500000)
collections_cursor._settings = {"max_block_size": 500000}
collections_cursor.settings = collections_cursor._settings

Next error:

  File "/venv/lib/python3.8/site-packages/asynch/cursors.py", line 63, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
TypeError: execute_iter() got an unexpected keyword argument 'args'

I tried to hide args param:

def fake_execute_iter(orig_method):
    async def do(
            query,
            args=None,
            params=None,
            with_column_types=False,
            external_tables=None,
            query_id=None,
            settings=None,
            types_check=False):
        return await orig_method(query, params, with_column_types, external_tables, query_id, settings, types_check)

    return do

Last exception got from here:

  File "/venv/lib/python3.8/site-packages/asynch/cursors.py", line 65, in execute
    self._process_response(response)
  File "/venv/lib/python3.8/site-packages/asynch/cursors.py", line 82, in _process_response
    columns_with_types = next(response)
  File "/venv/lib/python3.8/site-packages/asynch/proto/connection.py", line 857, in iter_receive_result
    for rows in IterQueryResult(gen, with_column_types=with_column_types):
  File "/venv/lib/python3.8/site-packages/asynch/proto/result.py", line 117, in next
    packet = next(self.packet_generator)
TypeError: 'async_generator' object is not an iterator

Haven't tried any further

jiejieling commented 1 year ago

please try the last version again

Simon-Chenzw commented 4 months ago

seems works well

    async with conn.cursor() as cursor:
        cursor.set_stream_results(stream_results=True, max_row_buffer=100)
        await cursor.execute("SELECT number, sleepEachRow(0.02) FROM system.numbers limit 10000")
        async for row in cursor:
            print(row)