apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
615 stars 113 forks source link

Add CometRowToColumnar operator #119

Closed advancedxy closed 2 months ago

advancedxy commented 3 months ago

What is the problem the feature request solves?

Just tried comet locally with spark-shell, it turns out that comet requires that all the input shall be columnar based. So, it's not possible to test comet with the following code:

spark.range(10, 1000, 10).selectExpr("id", "id + 1 as val").repartition(10, col("id"))

Adding the CometRowToColumnar operator on top of the leaf node(RangeExec) will make it easy to test comet out.

In the long term, I think the CometRowToColumnar could be used to wrap around Spark's row-base source exec node and makes all the following operators columnar-based and leverages comet's columnar execution.

Describe the potential solution

Add CometRowToColumnar operator

Additional context

No response

advancedxy commented 3 months ago

cc @sunchao @viirya

viirya commented 3 months ago

We have thought about this. This is on our plan list. As it is not as urgent as other Spark operators, so it is not in the first batch of native operators we implement.

advancedxy commented 3 months ago

As it is not as urgent as other Spark operators

Do you have a list of operators with priority attached to be supported? I think I can help a bit.

For the CometRowToColumnar operator, I can help support it too.

viirya commented 3 months ago

I think our short term goal will be making all queries in TPCH, TPCDS fully enabled with Comet operators. So these operators found in these queries are with higher priority. We have already implemented most of them, I think. One missing piece is Join operator which I'm working on. It is close to be done as it passed all Comet tests and Spark test but we are still resolving a few test failures on TPCDS.

And not only operator, native expression is also we want to deal with.

We have looked at finding any unsupported ones like unsupported operators, expressions in TPCDS, TPCH queries regarding Comet. I believe that Parth has been working on the list. cc @parthchandra

sunchao commented 3 months ago

@viirya feel free to break up the tasks for join when you think it is necessary, to improve the parallelism :) (I'm not sure whether some extra work is required for broadcast join atm).

@advancedxy you can also check the existing operators on the Spark side and see if there are some gaps that we should fill.

There are also a bunch of tasks on the DataFusion side in particular on aggregate and join performances. To name a few:

I think implementing the support of operator is just the start. How to get good performance out of them will also become very important in future.

sunchao commented 3 months ago

One thing we don't support is InSubqueryExec, which is used in dynamic partition pruning and other things. We do support ScalarSubqueryExec at the moment.

viirya commented 3 months ago

I'm not sure whether some extra work is required for broadcast join atm).

For broadcast join, basically we just need hash join + broadcast in Comet. I have hash join draft work. And we already have broadcast. Once we have hash join ready, it should be easy to have broadcast join by combining them.

add spilling for SMJ in DF (@viirya do we have an issue tracking this?)

Just created it: https://github.com/apache/arrow-datafusion/issues/9359

viirya commented 3 months ago

feel free to break up the tasks for join when you think it is necessary, to improve the parallelism

For SortMergeJoin support in Comet, it is a integral one like other working items we finished or are working on, and it makes more sense work on it as whole (except that you want to break it out to serde code, CometSortMergeJoinExec operator class, test, etc. 😂 ).

There are some pre tasks and they are finished, e.g., relaxing join on expression type and adding join filter support.

Improving DataFusion SortMergeJoin could be a separate task as it is orthogonal to the task of adding support in Comet. Although I am not sure where is the performance bottleneck yet, but from the benchmark I ran before compared to Spark, it doesn't have better performance but just similar.

SortMergeJoin spilling support is also another separate task. I created a ticket for that.

advancedxy commented 3 months ago

@advancedxy you can also check the existing operators on the Spark side and see if there are some gaps that we should fill.

Good point. Actually I am actively evaluating comet and check whether something works or not. I may create issues if some operators or expressions are not supported yet.

But I do agree with @viirya's point. We should fully support TPCH and TPC-DS queries in the short term and priorities operators and expressions on that for both coverage and performance.

advancedxy commented 3 months ago

FYI, I am planning to working on this in the next week.

Of course, if others are also interested in this, you can comment here and take it over, I can help review and/or provide some help if needed.