apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
653 stars 119 forks source link

Add `InSubqueryExec` support #121

Open viirya opened 4 months ago

viirya commented 4 months ago

What is the problem the feature request solves?

From @sunchao https://github.com/apache/arrow-datafusion-comet/issues/119#issuecomment-1965754058

One thing we don't support is InSubqueryExec, which is used in dynamic partition pruning and other things. We do support ScalarSubqueryExec at the moment.

We can add support of InSubqueryExec to Comet. Basically, it should not be too different/difficult than ScalarSubqueryExec which we already have.

Describe the potential solution

Suport InSubqueryExec in Comet.

Additional context

No response

advancedxy commented 4 months ago

I can work on this if no one is currently working on it.

advancedxy commented 3 months ago

I did some research to support InSubqueryExec. I think we should postpone the support a little bit, at least after Comet supporting Join operators.

The InSubqueryExec is mainly used for

  1. DPP(dynamic partition pruning), which evaluates the in predicate in the driver side.
  2. Some special cases, which actually performs the inSet evaluation in the executor side(for Comet, the native side).

For the first part, it would be pretty straightforward to support in the Comet side as all the evaluations happens at the driver(/JVM) side. We can model that like InSubqueryExec to prepare subqueries first and do some potential expression and plan transforms. We are good to go. However, DPP applies to Join operators. It would be reasonable to add DPP support after we have Join operators in Comet.

For the second part, it's slightly complicated. Per my understanding, we have multiple options:

  1. Like we did for ScalarSubqueryExec, we can add a InSubquery PhysicalExpr implementation. The main problem is how to transform the list data from JVM to the native side. I'm skeptical to just transfer the java object array via the JNI call as the list might be pretty big. Maybe we should transform that to a RecordBatch/CometVector and then pass it back to the native side?
  2. Instead of implementing InSubquery, we can rewrite it with the InSet expression as we have already has the subquery list collected before we actually execute the plan. The problem is that:
    • Currently, we don't have a way to rewrite/transform the native operator after we created it
    • The proto message should have a size limit, something like 64MB? It will not work for the huge inSet.

cc @viirya @sunchao appreciate if you guys have more insights about this topic.

viirya commented 3 months ago

For the first part, it would be pretty straightforward to support in the Comet side as all the evaluations happens at the driver(/JVM) side.

Hm? For DPP, the only difference is we don't need to broadcast the evaluation result. I'm not sure about "all evaluations happen at the driver side". I think the subplan is still needed to be executed on executors like ScalarSubqueryExec does.

Like we did for ScalarSubqueryExec, we can add a InSubquery PhysicalExpr implementation. The main problem is how to transform the list data from JVM to the native side. I'm skeptical to just transfer the java object array via the JNI call as the list might be pretty big. Maybe we should transform that to a RecordBatch/CometVector and then pass it back to the native side?

I think we can have an initial version which does simple first by passing java objects through JNI call. Or, instead, we can keep the output of subplan in JVM, and we evaluate the JVM InSet expression through JNI call from native side. As InSet evaluation should be fast as it's only a hash table lookup.

Instead of implementing InSubquery, we can rewrite it with the InSet expression as we have already has the subquery list collected before we actually execute the plan. The problem is that:

InSubqueryExec is a subquery expression which is different to InSet. I'm not sure how do you plan to rewrite it to an InSet but also keep the subquery plan.

advancedxy commented 3 months ago

I'm not sure about "all evaluations happen at the driver side"

Emmm, I should be more specific. I meant all the in evaluations are happened at the driver side. The subplan/subquery is of course needed to be executed/prepared first.

I think we can have an initial version which does simple first by passing java objects through JNI call

Hmm, this is always a valid option to go. I'm wondering whether it's simple enough to just convert the list of literal into an Arrow's ColumnVector as we can reuse all the infrastructure.

Or, instead, we can keep the output of subplan in JVM, and we evaluate the JVM InSet expression through JNI call from native side. As InSet evaluation should be fast as it's only a hash table lookup.

I'm not sure about this approach, and never consider this as an option. The problem about evaluate the InSet expression in the JVM side is that now it requires that we do RecordBatch to InternalRow conversation in the native side(the InSuqueryExec requires an InternalRow to evaluate the child), and passing it back to JVM and get back the result. It generally defeats columnar execution?

I'm not sure how do you plan to rewrite it to an InSet but also keep the subquery plan.

Currently, in org.apache.spark.sql.comet.CometNativeExec#doExecuteColumnar, instead simply copy the serializedPlan message, we convert it back to operator message and transform the InSubQuery expr to InSet expr. There's no such utility to support that through, or ,maybe we are never going to go with that way.