apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
654 stars 122 forks source link

Support OneRowRelation #516

Open tshauck opened 1 month ago

tshauck commented 1 month ago

What is the problem the feature request solves?

It's come up a couple of times recently that expressions need scalar tests, but it's not clear it's possible to have Comet execute something like SELECT trim('123 ').

Doing a little digging, I think it's because in Spark these queries include OneRowRelation as the scan, which isn't supported by Comet.

spark-sql (default)> SELECT trim('123 ');
24/06/04 10:37:51 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because Scan OneRowRelation is not supported
123
Time taken: 0.066 seconds, Fetched 1 row(s)

My thinking is that if this SparkPlan is supported, then it should be possible to run scalar queries. If this isn't the case, then feel free to close this issue.

Describe the potential solution

Happy to pursue a different approach if there's a better one, but I wanted to open this issue first to see if it's a problem and if this is an appropriate solution.

Potential solution:

  1. OneRowRelation (which is an RDDScanExec) could be implemented a native operator. DataFusion has a similar PlaceHolderExec.
  2. Update https://github.com/apache/datafusion-comet/blob/a668a8657a16496781075a014c6009d038c3fa1b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala#L299-L712 to match on RDDScanExec that's a OneRowRelation, and have it use the aforementioned operator. Currently val newOp = transform1(op) in the ProjectExec seems to return None causing the match arm that should return CometProjectExec to be missed.

Additional context

No response

tshauck commented 1 month ago

@kazuyukitanimura I'm wondering if you might have any thoughts on this given some of your recent PR comments / opened issues? Thanks!

kazuyukitanimura commented 1 month ago

@tshauck Thank you for working on this. I need to look into the details but the name OneRowRelation sounds like it is reading from a table. What about reading parquet directly? Also getting .explain(true) may help.

viirya commented 1 month ago

OneRowRelation is logical node. It will be planned as a physical node RDDScanExec with one single row (empty columns). It should be easy to implement in Comet.

tshauck commented 1 month ago

Thanks, that's my understanding as well.

Here's the extended explain for reference...

spark-sql (default)> EXPLAIN EXTENDED SELECT trim('123 ');
24/06/05 12:03:58 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because Execute ExplainCommand is not supported
Unsupported op node name: Scan OneRowRelation, Full class path: org.apache.spark.sql.execution.RDDScanExec
24/06/05 12:03:58 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because:
        - Execute ExplainCommand is not supported
        - CommandResult is not supported
== Parsed Logical Plan ==
'Project [unresolvedalias('trim(123 ), None)]
+- OneRowRelation

== Analyzed Logical Plan ==
trim(123 ): string
Project [trim(123 , None) AS trim(123 )#11]
+- OneRowRelation

== Optimized Logical Plan ==
Project [123 AS trim(123 )#11]
+- OneRowRelation

== Physical Plan ==
*(1) Project [123 AS trim(123 )#11]
+- *(1) Scan OneRowRelation[]

Time taken: 0.071 seconds, Fetched 1 row(s)

This is from a debug statement I put in that shows the node that can't be transformed...

Unsupported op node name: Scan OneRowRelation, Full class path: org.apache.spark.sql.execution.RDDScanExec

When a constant is used with a parquet table things seem to work fine.

spark-sql (default)> EXPLAIN EXTENDED SELECT trim(col), trim('123 ') FROM qq;
== Parsed Logical Plan ==
'Project [unresolvedalias('trim('col), None), unresolvedalias('trim(123 ), None)]
+- 'UnresolvedRelation [qq], [], false

== Analyzed Logical Plan ==
trim(col): string, trim(123 ): string
Project [trim(col#15, None) AS trim(col)#26, trim(123 , None) AS trim(123 )#27]
+- SubqueryAlias spark_catalog.default.qq
   +- Relation spark_catalog.default.qq[col#15] parquet

== Optimized Logical Plan ==
Project [trim(col#15, None) AS trim(col)#26, 123 AS trim(123 )#27]
+- Relation spark_catalog.default.qq[col#15] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- CometProject [trim(col)#26, trim(123 )#27], [trim(col#15, None) AS trim(col)#26, 123 AS trim(123 )#27]
   +- CometScan parquet spark_catalog.default.qq[col#15] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/thauck/personal/code/github.com/tshauck/arrow-datafusion-c..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col:string>

Time taken: 0.065 seconds, Fetched 1 row(s)

Sounds like it makes sense to add the native node, so I'll give it a shot over the next week or so. Though please let me know if you immediate thoughts. Feel free to assign this issue to me.

kazuyukitanimura commented 1 month ago

Thank you @tshauck assigned this to you. Now OneRowRelation issue and scalar test issues are decoupled. If you would like to separate the ticket, please do so.

tshauck commented 3 weeks ago

@viirya or @kazuyukitanimura, would you either of you be able to quickly summarize what steps would be needed to implement this. I've been trying, but unfortunately am flailing a little bit given my lack of familiarity w/ datafusion comet.

I've got https://github.com/apache/datafusion-comet/compare/main...tshauck:support-one-row-relation?expand=1 so far, which works to convert the spark plan into a comet plan, e.g.

plan: CometProject [trim(123 )#0], [123 AS trim(123 )#0]
+- CometPlaceholderRow Project [123 AS trim(123 )#0], Scan OneRowRelation[]

but I'm not sure a) if this is the right path, and b) and if it is, what needs to happen next. I'd appreciate if you could spend 5 mins to save me a few hours, thanks!

kazuyukitanimura commented 3 weeks ago

@tshauck All plan transformation (currently) needs to start with a Parquet scan node. See https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala#L93 I am guessing you may have to add a case there to trigger the transformation by converting OneRowRelation