databricks / databricks-sql-python

Databricks SQL Connector for Python
Apache License 2.0
152 stars 87 forks source link

ResultSet `fetchmany_arrow`/`fetchall_arrow` methods fail during `concat_tables` #418

Open ksofeikov opened 1 month ago

ksofeikov commented 1 month ago

Hi there,

I'm using this client library to fetch much data from our DBX environment. The version I'm using is 3.3.0.

The library keeps crashing when attempts to concatenate two current and partial results. I can not attach to full trace, because it contains some of the internal schemas, but here is the gist of it:

creation_date
First Schema: creation_date: timestamp[us, tz=Etc/UTC]
Second Schema: creation_date: timestamp[us, tz=Etc/UTC] not null

status
First Schema: status: string
Second Schema: status: string not null

sender_id
First Schema: sender_id: string
Second Schema: sender_id: string not null

.
.
.
and a few other fields with the exact same discrepancy

The exact stack trace is

Traceback (most recent call last):
  File "/Users/X/work/scripts/raw_order.py", line 34, in <module>
    for r in tqdm(cursor, total=max_items):
  File "/Users/X/work/.venv/lib/python3.10/site-packages/tqdm/std.py", line 1181, in __iter__
    for obj in iterable:
  File "/Users/X/work//.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 422, in __iter__
    for row in self.active_result_set:
  File "/Users/X/work//.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 1112, in __iter__
    row = self.fetchone()
  File "/Users/X/work/.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 1217, in fetchone
    res = self._convert_arrow_table(self.fetchmany_arrow(1))
  File "/Users/X/work/.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 1193, in fetchmany_arrow
    results = pyarrow.concat_tables([results, partial_results])
  File "pyarrow/table.pxi", line 5962, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
GOES INTO DETAILS SPECIFIED ABOVE AS TO WHAT IS DIFFERNT

The data is coming through a cursor like this

connection = sql.connect(
    server_hostname="X",
    http_path="B",
    access_token=app_settings.dbx_access_token,
)

cursor = connection.cursor()

max_items = 100000
batch_size = 10000

cursor.execute(
    f"SELECT * from X where  creation_date between '2024-06-01' and '2024-09-01' limit {max_items}"
)

The source table is created through a CTAS statement, so all fields are nullable by default. I have found two ways to resolve the issue:

I checked the 2.9.6 source code and it does not seem to be using a permissive schema casting, so seems like a regression in this case.

I'm not sure if I can add anything else beyond that, but do let me know.

And to be clear, I request like 100k records at a time there, and can iterate through like 95k of them, and then it fails. So I'm not really sure if there is a reliable way to reproduce that

If the cluster runtime matters, 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)

Thanks!

kravets-levko commented 1 month ago

Hi @ksofeikov! Thank you for reporting this issue and attaching the stacktrace. Before we go deeper into fixing the code, I would like to understand - how did it ended up so two pieces of the same result set have different schema? Is it possible to create some synthetic example to reproduce this issue, so you don't have to reveal your data? It woul help us to check if promote_options="permissive" is a proper fix (or find the one if needed), also, check if other our libraries have similar bug

ksofeikov commented 1 month ago

@kravets-levko tbh, I'm not sure how to create a reproducible example here, since the cursor just bulk-reads the table from the store. Since this happens during the result read-out stage, it's not really something that touches the client code/controlled by me.

What I can do is maybe try to stop at a breakpoint and see how the cloud fetch gets a table with a different schema.

There is another pointer I could give, I guess. Let me know if I completely misunderstood it, but it seems like the CloudFetch would download literally files and then take results from there.

I remember seeing a similar pyarrow schema problem with dask when I downloaded files from AWS Athena and then tried to concatenate them through dd.read_parquet(...) and it would fail internally with the same thing. Back then I think what I figured was that when reading the parquet files, if a slice happens to have all nulls for a column then it's over. What seemingly helped me back then was forcing schema to be in a certain way.

However, memory might be letting me down here :)

ksofeikov commented 1 month ago

I suspect one way to create an example is to craft two arrow files with one column each and then one will have no null values in a column, and the other one will have nulls there. then try to read those files as if they were the query result, this will probably trigger it.

ksofeikov commented 1 month ago

Yeah, seems like the null assumption was correct. Here is a minimally working example

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df1 = pd.DataFrame({"a": ["a", "b"]}, dtype=str)
df1.to_parquet("1.parquet")
df2 = pd.DataFrame({"a": [None, None]}, dtype=str)
df2.to_parquet("2.parquet")
t1 = pq.read_table("1.parquet")
t2 = pq.read_table("2.parquet")

pa.concat_tables([t1,t2])

will throw

ArrowInvalid: Schema at index 1 was different: 
a: string
vs
a: null
ksofeikov commented 1 month ago

Also, the root cause of this is that strings are backed by python str, which go as object dtype in pandas. Backing this by either nullable pandas string or string[pyarrow] does not lead to an exception and reads the data fine

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df1 = pd.DataFrame({"a": ["a", "b"]}, dtype="string")
df1.to_parquet("1.parquet")
df2 = pd.DataFrame({"a": [None, None]}, dtype="string")
df2.to_parquet("2.parquet")
t1 = pq.read_table("1.parquet")
t2 = pq.read_table("2.parquet")

pa.concat_tables([t1,t2])

generates

pyarrow.Table
a: string
----
a: [["a","b"],[null,null]]
kravets-levko commented 1 month ago

@ksofeikov Yes, I totally understand what the error means. What I don't understand - why it happens. In your example, you literally created two different tables with different schemas. However, when you run SQL query - you get a single result set, all rows of which should have the same schema. Even if you UNION few tables - server should compute the common schema for the result. Even with CloudFetch it should be the same, because server prepends schema to each file before storing them to cloud, and it should use the same schema for all of them.

I have some suspects on where the schema may potentially get changed while reading the data from server, and I will try to reproduce your issue. Meanwhile, I want to ask you to check some other things:

ksofeikov commented 1 month ago

Just checked use_cloud_fetch=False - it does solve the problem. I also made sure to remove my patches allowing permissive schema merges. Works without cloud fetch, and starts failing again if I enable it.

ksofeikov commented 1 month ago

Changing env from 13.3 LTS to 14.3 LTS and 15.4 LTS seemingly does nothing to help the problem - the code still fails, if cloud fetch is enabled.

Is there a way to verify the env version on the client, just to make sure it was picked up after changing the compute on the platform?

Regardless, there are a couple of suitable workarounds for now, so good luck with the fix!

kravets-levko commented 1 month ago

It's good that workaround helped and you can continue using the library while we're looking for the fix. Too bad that we have another issue with CloudFetch 🤔 Anyway. Thank you for the bug report and all your help, which allowed to narrow down the scope of the issue (even though I still need to reproduce it 🙂). Will keep you posted if any updates or other questions

unj1m commented 1 hour ago

I just got bitten by this as well. FWIW, no pandas was involved in my use case. Just SQL.

It's good to know that disabling cloud fetch is a work-around.