eakmanrq / sqlframe

Turning PySpark Into a Universal DataFrame API
https://sqlframe.readthedocs.io/en/stable/
MIT License
191 stars 3 forks source link

On duckdb engine, some DataFrame functions fail after joins if next functions use the key/id column without referencing df source #50

Closed cristian-marisescu closed 1 month ago

cristian-marisescu commented 1 month ago

Hi,

I encountered following errors related to some functions, like .select() and .withColumn(s) if you use the key name, after a join, without referencing the df that it came from. Here is some code to test:

from sqlframe.duckdb import DuckDBDataFrame as DataFrame
from sqlframe.duckdb import DuckDBSession

spark = DuckDBSession()

initial: DataFrame = spark.createDataFrame(
    [
        ("some_id"),
        ("another_id"),
    ],
    ["example_key"],
)

for_join: DataFrame = spark.createDataFrame(
    [
        ("some_id", "some_id"),
        ("another_id", "slightly_different_id"),
    ],
    ["example_key", "new_key"],
)

joined = initial.join(for_join, on="example_key", how="left").select(
    F.col("example_key").alias("just_renamed_key") 
)

or 

joined = initial.join(for_join, on="example_key", how="left").withColumn(
    "just_renamed_key", F.col("example_key")
)

This fails with:

python sqlframe_playground.py
Traceback (most recent call last):
  File "/workspaces/redacted/sqlframe_playground.py", line 24, in <module>
    joined = initial.join(for_join, on="example_key", how="left").select(
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/operations.py", line 48, in wrapper
    df: t.Union[_BaseDataFrame, _BaseGroupedData] = func(self, *args, **kwargs)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/dataframe.py", line 679, in select
    cte = ctes_with_column[resolved_column_position[ambiguous_col]]
IndexError: list index out of range                                                                                                                                                                                                                                  

Also, there is another error, when trying a function on the key column, but only after calling .show()

joined = initial.join(for_join, on="example_key", how="left").select(
    F.coalesce("example_key", "new_key").alias("example_key")
)
joined.show()

This fails with:

python sqlframe_playground.py
Traceback (most recent call last):
  File "/workspaces/redacted/sqlframe_playground.py", line 39, in <module>
    joined.show()
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/dataframe.py", line 1512, in show
    result = self.session._fetch_rows(sql)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/session.py", line 455, in _fetch_rows
    self._execute(sql, quote_identifiers=quote_identifiers)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/session.py", line 427, in _execute
    self._cur.execute(self._to_sql(sql, quote_identifiers=quote_identifiers))
duckdb.duckdb.BinderException: Binder Error: Ambiguous reference to column name "example_key" (use: "t24194196.example_key" or "t26006366.example_key")

Looks like both are due to ambiguous reference, which would happen in sql as you need to point to the id, supported by the fact that replacing example_key with initial.example_key and new_key with for_join.new_key works in both cases.


joined = initial.join(for_join, on=["example_key"], how="left").select(
    F.coalesce(for_join.new_key, initial.example_key).alias("example_key")
)

joined.show()

works as expected

Although implicitly mentioning dataframes would solve the issue, there are a lot of cases where just having the same key on both sides simplifies readability, especially when dataframes names can get quite long.

eakmanrq commented 1 month ago

Thanks for reporting this! Fixed in 1.6.1

cristian-marisescu commented 1 month ago

@eakmanrq , indeed now it works for alias, but using it in an expression will still throw an error

from sqlframe.duckdb import DuckDBDataFrame as DataFrame
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F

spark = DuckDBSession()

initial: DataFrame = spark.createDataFrame(
    [
        ("some_id"),
        ("another_id"),
    ],
    ["example_key"],
)

for_join: DataFrame = spark.createDataFrame(
    [
        ("some_id", "some_id"),
        ("another_id", "slightly_different_id"),
    ],
    ["example_key", "new_key"],
)

joined = initial.join(for_join, on="example_key", how="left").select(
    F.coalesce("example_key", "new_key").alias("final_key")  # here on the coalesce
)
joined.show()
Traceback (most recent call last):
  File "/workspaces/redacted/frame.py", line 27, in <module>
    joined.show()
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/dataframe.py", line 1527, in show
    result = self.session._fetch_rows(sql)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/session.py", line 455, in _fetch_rows
    self._execute(sql, quote_identifiers=quote_identifiers)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/session.py", line 427, in _execute
    self._cur.execute(self._to_sql(sql, quote_identifiers=quote_identifiers))
duckdb.duckdb.BinderException: Binder Error: Ambiguous reference to column name "example_key" (use: "t24194196.example_key" or "t26006366.example_key")

or any function for that matter:

F.size("example_key").alias("final_key")

Btw, changing from joined.show() to joined.sql() hits sqlglot OptimizerError, instead of DuckDB one:

Traceback (most recent call last):
  File "/workspaces/redacted/frame.py", line 27, in <module>
    joined.sql()
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/dataframe.py", line 544, in sql
    exp.Select, self.session._optimize(select_expression, dialect=dialect)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlframe/base/session.py", line 422, in _optimize
    return optimize(expression, dialect=dialect, schema=self.catalog._schema)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlglot/optimizer/optimizer.py", line 92, in optimize
    optimized = rule(optimized, **rule_kwargs)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlglot/optimizer/qualify.py", line 90, in qualify
    validate_qualify_columns_func(expression)
  File "/workspaces/redacted/.venv/lib/python3.10/site-packages/sqlglot/optimizer/qualify_columns.py", line 102, in validate_qualify_columns
    raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
sqlglot.errors.OptimizeError: Column '"example_key"' could not be resolved

Doing some debugging on the size example, generated sql would name correctly on selection, but not on functions

F.size

"WITH t26006366 AS (SELECT CAST(example_key AS TEXT) AS example_key FROM (VALUES ('some_id'), ('another_id')) AS a1(example_key)),

t24194196 AS (SELECT CAST(example_key AS TEXT) AS example_key, CAST(new_key AS TEXT) AS new_key FROM (VALUES ('some_id', 'some_id'), ('another_id', 'slightly_different_id')) AS a2(example_key, new_key)),

t37939134 AS (SELECT ARRAY_LENGTH(example_key) AS final_key FROM t26006366 LEFT JOIN t24194196 ON t26006366.example_key = t24194196.example_key) SELECT final_key FROM t37939134 LIMIT 20"

vs

F.col

"WITH t26006366 AS (SELECT CAST(example_key AS TEXT) AS example_key FROM (VALUES ('some_id'), ('another_id')) AS a1(example_key)),

t24194196 AS (SELECT CAST(example_key AS TEXT) AS example_key, CAST(new_key AS TEXT) AS new_key FROM (VALUES ('some_id', 'some_id'), ('another_id', 'slightly_different_id')) AS a2(example_key, new_key)),

t36021190 AS (SELECT t26006366.example_key AS final_key FROM t26006366 LEFT JOIN t24194196 ON t26006366.example_key = t24194196.example_key) SELECT final_key FROM t36021190 LIMIT 20"

eakmanrq commented 1 month ago

Yeah good catch. I was a bit too strict in my search for ambiguous columns and therefore the fix was taking whatever expression I find and search within that for any ambiguous columns and resolve that. Thanks for reporting!

eakmanrq commented 1 month ago

Fix in 1.6.3