eakmanrq / sqlframe

Turning PySpark Into a Universal DataFrame API
https://sqlframe.readthedocs.io/en/stable/
MIT License
321 stars 9 forks source link

bug: Issue with aliasing cte dataframes when chaining joins and selects #184

Open eredzik opened 1 month ago

eredzik commented 1 month ago
from sqlframe.standalone import functions as SF

employee = get_df("employee")
store = get_df("store")
district = get_df("district")
result = (
        employee
        .alias('employee')
        .join(
            store
            .filter(SF.col("store_name") != "test") 
            .alias('store'),
            on=SF.col('employee.employee_id') == SF.col('store.store_id')
        )
        .select('employee.*', 'store.*') # When there is such select it throws error
        .join( # Exception raised here
            district
            .alias('district'),
            on=SF.col('store.store_id') == SF.col('district.district_id')
        )

    )
result.show()

Throws error:

Traceback (most recent call last):
  File "<string>", line 16, in <module>
  File "/home/emil/projects/sqlframe/sqlframe/base/dataframe.py", line 1580, in show
    result = df.limit(n).collect()
             ^^^^^^^^^^^^^^^^^^^^^
  File "/home/emil/projects/sqlframe/sqlframe/base/dataframe.py", line 1550, in collect
    return self._collect()
           ^^^^^^^^^^^^^^^
  File "/home/emil/projects/sqlframe/sqlframe/base/dataframe.py", line 1553, in _collect
    return self.session._collect(self._get_expressions(optimize=False), **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/emil/projects/sqlframe/sqlframe/base/session.py", line 493, in _collect
    self._execute(sql)
  File "/home/emil/projects/sqlframe/sqlframe/duckdb/session.py", line 72, in _execute
    self._last_result = self._cur.execute(sql)  # type: ignore
                        ^^^^^^^^^^^^^^^^^^^^^^
duckdb.duckdb.BinderException: Binder Error: Referenced table "t15874714" not found!
Candidate tables: "t11089442", "t41607634"
LINE 1: ... "t41607634" INNER JOIN "t11089442" ON "t15874714"."store_id" = "t11089442"."d...

Possible changes that resolve issue

If I run .alias() after .select() then table seem to be realiased and below query works just fine:

 (
        employee
        .alias('employee')
        .join(
            store
            .filter(SF.col("store_name") != "test") 
            .alias('store'),
            on=SF.col('employee.employee_id') == SF.col('store.store_id')
        )
        .select('employee.*','store.*')
        .alias('subquery')
        .join(
            district
            .alias('district'),
            on=SF.col('subquery.store_id') == SF.col('district.district_id')
        )
    ).show()
eakmanrq commented 1 month ago

What is interesting is your workaround with the alias is still not correct. That is because there are duplicate store_ids and in duckdb it is picking the first one. So Spark is automatically dealing with the ambiguity here. It seems like in the first example the "store_id" column from "store" is used instead of employee. This seems to break a general rule that exists which is that columns are resolve left -> right which is likely because the column was used in the join.

This is a bit trickier to solve since I need to better understand how Spark's handles this kind of ambiguity in order to properly replicate it. Thanks for pointing this out!