danzafar / tidyspark

tidyspark: a tidyverse implementation of SparkR built for simplicity, elegance, and ease of use.
Other
22 stars 0 forks source link

java.lang.UnsupportedOperationException: Cannot evaluate expression: max(input[0, double, true]) #19

Closed danzafar closed 4 years ago

danzafar commented 4 years ago

I'm seeing this error crop up in a few aggregated filter operations (see #17 and #18). Starting this thread to investigate directly. It seems this command works:

spark_tbl(iris) %>%
    group_by(Species) %>%
    filter(Petal_Length < max(Sepal_Length)) %>%
    collect()

but this command does not

spark_tbl(iris) %>%
    group_by(Species) %>%
    filter(Petal_Length == max(Sepal_Length)) %>%
    collect()

with error:

Error in handleErrors(returnStatus, conn) : 
  java.lang.UnsupportedOperationException: Cannot evaluate expression: max(input[0, double, true])
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
    at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
        ...
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
    at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:181)
    at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala)

So this is an internal Spark issue. Let's figure out what exactly is going on here.

danzafar commented 4 years ago

OK physical plan are pretty different making me thing the window is not being applied:

> spark_tbl(iris) %>%
+     group_by(Species) %>%
+     filter(Petal_Length < max(Sepal_Length)) %>% 
+     explain
== Physical Plan ==
*(2) Project [Sepal_Length#374, Sepal_Width#375, Petal_Length#376, Petal_Width#377, Species#378]
+- *(2) Filter ((isnotnull(Petal_Length#376) && isnotnull(agg_col0#386)) && (Petal_Length#376 < agg_col0#386))
   +- Window [max(Sepal_Length#374) windowspecdefinition(Species#378, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS agg_col0#386], [Species#378]
      +- *(1) Sort [Species#378 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(Species#378, 200)
            +- Scan ExistingRDD[Sepal_Length#374,Sepal_Width#375,Petal_Length#376,Petal_Width#377,Species#378]

> spark_tbl(iris) %>%
+     group_by(Species) %>%
+     filter(Petal_Length == max(Sepal_Length)) %>% 
+     explain
== Physical Plan ==
*(1) Filter (isnotnull(Petal_Length#406) && (Petal_Length#406 = max(Sepal_Length#404)))
+- Scan ExistingRDD[Sepal_Length#404,Sepal_Width#405,Petal_Length#406,Petal_Width#407,Species#408]
danzafar commented 4 years ago

This is fixed by the following commits: https://github.com/danzafar/tidyspark/pull/21/commits/cabddabc6f65f7f83d4d080688739aaf6dbb6039 https://github.com/danzafar/tidyspark/pull/21/commits/d5c532b88a551471b401e6a13de91099dc1e0a3d