long2ice / asynch

An asyncio ClickHouse Python Driver with native (TCP) interface support.
https://github.com/long2ice/asynch
Apache License 2.0
177 stars 41 forks source link

Bug IndexError #69

Open KAINeDEZZ opened 1 year ago

KAINeDEZZ commented 1 year ago
class AsynchContexManager:
    def __init__(self, url: str):
        self.url = url
        self.connection = None

    async def __aenter__(self) -> asynch.connection.Connection:
        self.connection = await asynch.connect(self.url)
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.connection.close()

def clickhouse_cursor(func: Callable[Concatenate[asynch.connection.Connection, P], T]) -> Callable[P, T]:
    url = f"{clickhouse.get_connection_url()}?os_thread_priority=19&max_threads=4"

    @functools.wraps(func)
    @backoff.on_exception(backoff.constant, (ClickHouseException, DatabaseError), max_tries=3, interval=90)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        async with AsynchContexManager(url) as connection:
            async with connection.cursor(cursor=cursors.DictCursor) as cursor:
                return await func(*args, **kwargs, cursor=cursor)

    return wrapper
class Puller:
    @clickhouse_cursor
    @backoff.on_exception(backoff.constant, IndexError, interval=10)
    async def pull(self, cursor: asynch.cursors.Cursor) -> DataFrame:
        await cursor.execute(f"""                                                                                                 # <- Index error
            select * 
            from {self.table_name}
            limit {self.limit}
            offset {self.pointer}
        """)
File "/opt/project/jobs/sync_attribution/utils/puller.py", line 94, in pull
  await cursor.execute(f"""
File "/usr/local/lib/python3.10/site-packages/asynch/cursors.py", line 64, in execute
  response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 623, in execute
  async with ExecuteContext(self, query, settings):
File "/usr/local/lib/python3.10/site-packages/asynch/proto/context.py", line 56, in __aexit__
  raise exc_val
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 639, in execute
  rv = await self.process_ordinary_query(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 768, in process_ordinary_query
  return await self.receive_result(with_column_types=with_column_types, columnar=columnar)
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 487, in receive_result
  return await result.get_result()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/result.py", line 56, in get_result
  async for packet in self.packet_generator:
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 465, in packet_generator
  packet = await self.receive_packet()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 391, in receive_packet
  packet = await self._receive_packet()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 425, in _receive_packet
  packet.block = await self.receive_data()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 328, in receive_data
  return await (self.block_reader_raw if raw else self.block_reader).read()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/block.py", line 82, in read
  column = await read_column(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/__init__.py", line 155, in read_column
  return await column.read_data(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/base.py", line 112, in read_data
  return await self._read_data(n_items, nulls_map=nulls_map)
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/base.py", line 115, in _read_data
  items = await self.read_items(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/stringcolumn.py", line 18, in read_items
  ret.append(await self.reader.read_str(as_bytes=self.read_as_bytes))
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/buffered.py", line 141, in read_str
  length = await self.read_varint()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/buffered.py", line 130, in read_varint
  packet = self._read_one()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/buffered.py", line 120, in _read_one
  packet = self.buffer[self.position]
dexError: bytearray index out of range

clickhouse: 23.3.1.2823 python: 3.10 asynch: 0.2.2

draincoder commented 10 months ago

Hi, how do you handle this error? It looks like there is still no fix.

mikoda1995 commented 10 months ago

got the same error. Python 3.11, Clickhouse 23.10.2.13. asynch upstream

lainiwa commented 10 months ago

Getting the same exception. Seems to be a lasting issue: #10, #20, #25.

dtatarkin commented 10 months ago

Got the same error: clickhouse: 23.8.3.48 python: 3.11.4 asynch: 0.2.2