ClickHouse / clickhouse-connect

Python driver/sqlalchemy/superset connectors
Apache License 2.0
326 stars 64 forks source link

Streaming with query_row_block_stream crashes after few reads #399

Closed keu closed 1 month ago

keu commented 1 month ago

Describe the bug

The streaming read is crashing after some time if there any processing in between reads.

Traceback (most recent call last):
  File "/usr/lib/python3.11/http/client.py", line 573, in _get_chunk_left
    chunk_left = self._read_next_chunk_size()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 540, in _read_next_chunk_size
    return int(line, 16)
           ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b'\xe9\xe9\x80\xe8\xf8\x8c\x05I"\xe7\x1bh\x9e\x0c\xa6\x06\xe3\x92\xa3?\x7f6:e\xec\x9d\xa8\x01d\xd3\x118{\xe1\xe9\xe60ta\x11T\xeb\xd0\x1d\x91\xed\xbf\x8aT5\xc8\x00\xceN\x1bA{\xe4\xf9\xeb\xd0\xc5\xba\xb
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib/python3.11/http/client.py", line 590, in _read_chunked
    chunk_left = self._get_chunk_left()
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 575, in _get_chunk_left
    raise IncompleteRead(b'')
http.client.IncompleteRead: IncompleteRead(0 bytes read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 567, in read
    data = self._fp_read(amt) if not fp_closed else b""
           ^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 533, in _fp_read
    return self._fp.read(amt) if amt is not None else self._fp.read()
           ^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 467, in read
    return self._read_chunked(amt)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 605, in _read_chunked
    raise IncompleteRead(b''.join(value)) from exc
http.client.IncompleteRead: IncompleteRead(134 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/progai/pipeline/airflow/dags/shared/female_names_processing.py", line 76, in execute
    for i, block in enumerate(stream):
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/common.py", line 201, in __next__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/query.py", line 296, in _row_block_stream
    for block in self._column_block_stream():
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/transform.py", line 75, in gen
    next_block = get_block()
                 ^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/transform.py", line 50, in get_block
    column = col_type.read_column(source, num_rows, context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/datatypes/base.py", line 143, in read_column
    return self.read_column_data(source, num_rows, ctx)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/datatypes/base.py", line 158, in read_column_data
    column = self._read_column_binary(source, num_rows, ctx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/datatypes/string.py", line 34, in _read_column_binary
    return source.read_str_col(num_rows, self._active_encoding(ctx))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "clickhouse_connect/driverc/buffer.pyx", line 248, in clickhouse_connect.driverc.buffer.ResponseBuffer.read_str_col
  File "clickhouse_connect/driverc/buffer.pyx", line 134, in clickhouse_connect.driverc.buffer.ResponseBuffer._read_str_col
  File "clickhouse_connect/driverc/buffer.pyx", line 74, in clickhouse_connect.driverc.buffer.ResponseBuffer.read_bytes_c
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/httputil.py", line 200, in decompress
    chunk = response.read(chunk_size, decode_content=False)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 566, in read
    with self._error_catcher():
  File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(134 bytes read)', IncompleteRead(134 bytes read))

Steps to reproduce

  1. start stream read
  2. consume iterator with some work/sleep
  3. soon it will crash with the error above

Expected behaviour

I expect it to read the whole dataset. If I disable processing, the dataset is read fine (40 mln records). This leads me to think that it is not related to actual data and response but something inside the implementation.

Code example

import clickhouse_connect
read_client = clickhouse_connect.get_client(
            host=host, username=login, password=password, port=port,
            connect_timeout=60 * 30, send_receive_timeout=60 * 30, client_name="airflow-read",
            settings={
                "session_timeout": 60 * 20,
            }
        )
qry = 'SELECT id, name FROM dev.tbl WHERE NOT empty(name)'
with read_client.query_row_block_stream(qry) as stream:
    for block in stream:
        rows = list(process(block, dataset))

clickhouse-connect and/or ClickHouse server logs

Configuration

Environment

ClickHouse server

genzgd commented 1 month ago

It's crashing because your connection is breaking: 'Connection broken: IncompleteRead(134 bytes read)'. This could be caused by something like an idle timeout anywhere in your chain. How much time is processing taking per chunk?

keu commented 1 month ago

The average processing time for a single block is ~10 seconds. The connection breaks around 4-5 minute after the read started. The server and the client are on the same machine.

genzgd commented 1 month ago

After digging into this I believe the problem is that ClickHouse server times out when pushing more data because the client has not read all of the data off the socket. When trying to reproduce this I get the following error in the ClickHouse logs:

DynamicQueryHandler: Code: 209. DB::Exception: Timeout exceeded while writing to socket ([::1]:63309, 30000 ms): While executing Native. (SOCKET_TIMEOUT)

The socket is still busy/full when ClickHouse tries to send the error:

DynamicQueryHandler: Cannot send exception to client: Code: 209. DB::NetException: Timeout exceeded while writing to socket ([::1]:63309, 30000 ms). (SOCKET_TIMEOUT)

The end result is that if reading data falls more than 30 seconds behind ClickHouse sending the data, ClickHouse will close the connection, causing the error you see.

There's not an easy fix directly in clickhouse-connect because the point of streaming is to avoid reading all of the data at once into memory -- but if your processing is falling behind, the data sent from ClickHouse has no place to go. So the short term answer to your problem is to only query as much data from ClickHouse as you can process without falling behind more than 30 seconds. Unfortunately the HTTP interface in ClickHouse is not "smart" to keep the connection open and stream the data as it is being consumed.

However, in the next release I'm looking at adding an intermediate buffer with a configurable size to temporarily store the HTTP data until requested by the stream processing. So if your total query size is something like 50MB, and the new intermediate buffer is configured at 100MB, you should not have this issue. But there will definitely be a tradeoff between using the additional memory and ensuring that your connection isn't closed while processing.

keu commented 1 month ago

@genzgd thank you for the investigation. I understand it more precisely now. So, the short-term fix for me would be something like this

client = clickhouse_connect.get_client(
   ...
   settings={"max_block_size": 30 / seconds_per_row}
)
genzgd commented 1 month ago

max_block_size isn't going to reduce the total amount of data you're consuming, and you'll still fall behind as ClickHouse tries to push data faster than you can process it. You either need faster processing or to read less data in the same amount of time by using queries that return less data.

keu commented 1 month ago

I see. Unfortunately, I don't see an easy way to make processing faster without going into buffering and multithreading, which IMO probably will cause more troubles in future. So the only option I see for now is to save data to disk and then read from it. AFAIK Clickhouse doesn't have an analog to Postgres server-side cursor...

genzgd commented 1 month ago

Unfortunately, no, there is no server side cursor. If you can break your query up into chunks based on the primary key, you could read each chunk into memory (using just the client query method instead of streaming), then process each chunk, and then get the next one. Some version of that is going to be necessary unless you do something to match your processing speed with the amount of data ClickHouse is sending.