voltrondata / spark-substrait-gateway

Implements a gateway that speaks the SparkConnect protocol and drives a backend using Substrait (over ADBC Flight SQL).
Apache License 2.0
15 stars 8 forks source link

Support JoinType "full_outer" with DuckDB backend #69

Closed pthatte1-bb closed 1 month ago

pthatte1-bb commented 1 month ago

Queries with JoinType "full_outer" fail with error "Unsupported join type".

Snippet to recreate error -

df1 = (
    get_customer_database(spark_session).filter(col("c_custkey").eqNullSafe(131074))
    .select(col("c_custkey").alias("join_custkey"), col("c_name"))
)
df2 = (
    get_customer_database(spark_session).filter(col("c_custkey").eqNullSafe(131075))
    .select(col("c_custkey").alias("other_custkey"), col("c_name"))
)
results = df1.join(df2, on=col("join_custkey").eqNullSafe(col("other_custkey")), how="full").collect()
print(results)

Changing the JoinType to "left" or "right" works.

Also, DuckDB-SQL version of above query works ok-

WITH raw_data as (SELECT * from read_parquet('{tpch_customer_parquet_path}')),
cte1 as (SELECT c_custkey as join_custkey, c_name from raw_data where c_custkey = 131074),
cte2 as (SELECT c_custkey as other_custkey, c_name from raw_data where c_custkey = 131075)
select * from cte1 full join cte2 on join_custkey = other_custkey
EpsilonPrime commented 1 month ago

DuckDB may support this but it doesn't have support in their Substrait implementation:

install substrait;
load substrait;
install tpch;
load tpch;

call dbgen(sf=1);

.width -1
.mode csv

FROM get_substrait_json('
    WITH raw_data as (SELECT * from customer),
    cte1 as (SELECT c_custkey as join_custkey, c_name from raw_data where c_custkey = 131074),
    cte2 as (SELECT c_custkey as other_custkey, c_name from raw_data where c_custkey = 131075)
    select * from cte1 full join cte2 on join_custkey = other_custkey;
  ');

Which results in the following:

INTERNAL Error: Unsupported join type FULL
This error signals an assertion failure within DuckDB. This usually occurs due to unexpected conditions or errors in the program's logic.
For more information, see https://duckdb.org/docs/dev/internal_errors
EpsilonPrime commented 1 month ago

I'll file an issue.

pthatte1-bb commented 1 month ago

Thank you. I did some digging and in case it helps,

I am able to achieve an OUTER join using a combination of LEFT_join->RIGHT_join->UNION->DISTINCT. So hopefully the missing feature is achievable, and not an intentional exclusion.

EpsilonPrime commented 1 month ago

DuckDB merged support for full outer joins in Substrait today. It will be in the next release.

pthatte1-bb commented 1 month ago

I see the commit, thank you. Trying to build/test locally, but happy to close the issue sooner if you like.

EpsilonPrime commented 1 month ago

I have a test I'll add for this once that version becomes available. Until then I'm okay with keeping this open. I might look into labels so we can mark ones we're waiting on for easier tracking.

pthatte1-bb commented 1 month ago

I tried with a locally-built binary. Snippet used for testing -

WITH raw_data as (SELECT * from customer),
cte1 as (SELECT c_custkey as join_custkey, c_name from raw_data where c_custkey = 131074),
cte2 as (SELECT c_custkey as other_custkey, c_name from raw_data where c_custkey = 131075)
select * from cte1 full join cte2 on join_custkey = other_custkey;

Substrait-production works fine. Consuming the produced-substrait fails with a validation error (dup-col-name"c_name").

AFAIK this might have slipped through because the nested expressions PR is missing substrait consumption tests: https://github.com/duckdb/substrait/pull/104/files#diff-83cddda20fbd3b324186aab07ed1d31b844236d9ae79727c60444002a3c8c7dfR48

Closing this issue because full-join does work with a query that passes validation. Snippet to reproduce success -

WITH raw_data as (SELECT * from customer),
cte1 as (SELECT c_custkey as join_custkey, c_name as c_name1 from raw_data where c_custkey = 131074),
cte2 as (SELECT c_custkey as other_custkey, c_name as c_name2 from raw_data where c_custkey = 131075)
select * from cte1 full join cte2 on join_custkey = other_custkey;