databricks / databricks-sql-python

Databricks SQL Connector for Python
Apache License 2.0
171 stars 94 forks source link

Connection broken: IncompleteRead with Parallel Queries #438

Open Uranium2 opened 2 months ago

Uranium2 commented 2 months ago

Hello,

My team and I have been using the databricks-sql-python library for a year now. It works great for the majority of our use cases, most of which involve linear algorithms that fetch data one query at a time. Occasionally, we run multiple jobs with parallel queries, and the Databricks Warehouse manages the workload using the queue without any issues.

Recently, we encountered a new use case where a developer needed to execute a large number of queries in parallel. I advised him that the Databricks connector is not thread-safe, so he implemented a process pool, creating a new connector for each process. For context, we are using an Azure machine with 96 CPUs and 672 GB RAM. But we limit at 20 process, so 20 queries at the same time possible. I understand that the query queue limit in a warehouse is 1,000 queries. Our Warehouse is set to scale up to 5 nodes and is in Pro + Large. So it should not be an issue.

The issue we're facing is that one query frequently fails near the end of the process, despite all queries being similar (mainly differing by a filter). We're unsure why this happens. We tried increasing the number of nodes and the warehouse size, but without success.

2024-09-12 05:53:07,689 - client - Successfully opened session 01ef70cb-48f7-1ead-96e7-cac77c42cbc1
2024-09-12 05:53:26,668 - fetch - Fetching optimization input data for enseigne: X, agence: XXXX
2024-09-12 05:53:26,768 - client - Successfully opened session 01ef70cb-5456-1a9f-a337-d5cd7a084db3
2024-09-12 05:53:29,651 - fetch - Fetching optimization input data for enseigne: Y, agence: YYYY
2024-09-12 05:53:29,760 - client - Successfully opened session 01ef70cb-561d-170b-a7b6-18c72ddc83ae
2024-09-12 05:53:33,840 - fetch - Fetching optimization input data for enseigne: X, agence: XXX
2024-09-12 05:53:33,961 - client - Successfully opened session 01ef70cb-589f-1010-a993-4b9d7f3d36b6
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 560, in _get_chunk_left
    chunk_left = self._read_next_chunk_size()
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 527, in _read_next_chunk_size
    return int(line, 16)
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 592, in _readinto_chunked
    chunk_left = self._get_chunk_left()
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 562, in _get_chunk_left
    raise IncompleteRead(b'')
http.client.IncompleteRead: IncompleteRead(0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/urllib3/response.py", line 567, in read
    data = self._fp_read(amt) if not fp_closed else b""
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/urllib3/response.py", line 533, in _fp_read
    return self._fp.read(amt) if amt is not None else self._fp.read()
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 463, in read
    n = self.readinto(b)
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 497, in readinto
    return self._readinto_chunked(b)
  File "/azureml-envs/python3.9/lib/python3.9/http/client.py", line 608, in _readinto_chunked
    raise IncompleteRead(bytes(b[0:total_bytes]))
http.client.IncompleteRead: IncompleteRead(301981 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/azureml-envs/python3.9/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/azureml-envs/python3.9/lib/python3.9/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/mnt/azureml/cr/j/e18e2ab962444224bfbbbde1057dfc11/exe/wd/core/tarif_optimization/pipelines/fetch.py", line 73, in fetch_input_data_for_agence
    data = dh.fetch_all(query)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/datalab_framework/data_accessor/timing.py", line 69, in wrapper
    result = function(*args, **kwargs)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/datalab_framework/data_accessor/data_handler.py", line 140, in fetch_all
    self._execute_query(cursor, query)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/datalab_framework/data_accessor/data_handler.py", line 112, in _execute_query
    cursor.execute(query)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/client.py", line 505, in execute
    self.active_result_set = ResultSet(
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/client.py", line 818, in __init__
    self._fill_results_buffer()
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/client.py", line 830, in _fill_results_buffer
    results, has_more_rows = self.thrift_backend.fetch_results(
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 925, in fetch_results
    resp = self.make_request(self._client.FetchResults, req)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 429, in make_request
    response_or_error_info = attempt_request(attempt)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 369, in attempt_request
    raise err
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 347, in attempt_request
    response = method(request)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_api/TCLIService/TCLIService.py", line 757, in FetchResults
    return self.recv_FetchResults()
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_api/TCLIService/TCLIService.py", line 776, in recv_FetchResults
    result.read(iprot)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_api/TCLIService/TCLIService.py", line 3679, in read
    self.success.read(iprot)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_api/TCLIService/ttypes.py", line 8714, in read
    self.results.read(iprot)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_api/TCLIService/ttypes.py", line 3213, in read
    _elem121.read(iprot)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/thrift_api/TCLIService/ttypes.py", line 2868, in read
    self.batch = iprot.readBinary()
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/thrift/protocol/TBinaryProtocol.py", line 234, in readBinary
    s = self.trans.readAll(size)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/thrift/transport/TTransport.py", line 62, in readAll
    chunk = self.read(sz - have)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/databricks/sql/auth/thrift_http_client.py", line 126, in read
    return self.__resp.read(sz)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/urllib3/response.py", line 593, in read
    raise IncompleteRead(self._fp_bytes_read, self.length_remaining)
  File "/azureml-envs/python3.9/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/azureml-envs/python3.9/lib/python3.9/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(301981 bytes read)', IncompleteRead(301981 bytes read))
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/mnt/azureml/cr/j/e18e2ab962444224bfbbbde1057dfc11/exe/wd/3_optimize_tarifs.py", line 62, in <module>
    main()
  File "/mnt/azureml/cr/j/e18e2ab962444224bfbbbde1057dfc11/exe/wd/3_optimize_tarifs.py", line 22, in main
    fetch_pipeline.run(settings=settings, data_handler=data_handler)
  File "/mnt/azureml/cr/j/e18e2ab962444224bfbbbde1057dfc11/exe/wd/core/tarif_optimization/pipelines/fetch.py", line 28, in run
    pool.starmap(fetch_input_data_for_agence, args)
  File "/azureml-envs/python3.9/lib/python3.9/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/azureml-envs/python3.9/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(301981 bytes read)', IncompleteRead(301981 bytes read))
2024-09-12 05:53:55,297 - client - Closing session 01ef70b9-5591-1a64-be6c-f05f76c916bc
2024-09-12 05:53:55,402 - clientserver - Closing down clientserver connection

Do you have any insights into what might be causing this issue? We're using the latest version of the databricks-sql-connector==3.4.0

kravets-levko commented 2 months ago

Hi @Uranium2! Thank you for reporting this, and also for adding all the logs. The exception is thrown by http library which cannot parse server response. I've found this issue which looks related. So what I would suggest to start from is to somehow dump the raw server response when such error happens next time. Then, analyze the dump - including empty lines and presense of correct newline sequences. If everything looks correct there - please let me know, then we'll think what to do next. But if you'll notice anything wrong with the reponse itself - reach a Databricks support and show them this dump