Open PangZhi opened 9 years ago
I tried the following,
val carsDDF = loadMtCarsDDF()
val result = flinkDDFManager.sql("select count(*),cyl from mtcars group by cyl",FlinkConstants.ENGINE_NAME)
import scala.collection.JavaConversions._
result.getRows.foreach(println)
and the result was
7,6
11,4
14,8
@Shiti Can you try to create another ddf from mtcars, and run the sql("select count(*), cyl from new_ddf group by cyl")
I got the error when I tried the following code and its not specific to count(*)
. i.e., count(columnName)
also throws the same error.
val carsDDF = loadMtCarsDDF()
val result = flinkDDFManager.sql2ddf("select count(mpg),cyl from mtcars group by cyl",FlinkConstants.ENGINE_NAME)
val rows = result.sql("select * from @this","")
rows.getRows.foreach(println)
A temporary workaround for this is to use alias for the count operation,
select count(*) as count_by_cyl,cyl from mtcars group by cyl
I created a ddf and then try to run ddfmanager.sql(“select count(*), column from ddf group by column"). It has the following error msg:
Caused by: org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Unable to find field "intermediate" in type org.apache.flink.ap i.table.Row(intermediate.1: Integer, cyl: Integer). at org.apache.flink.api.scala.typeutils.CaseClassTypeInfo.getTypeAt(CaseClassTypeInfo.scala:213) at org.apache.flink.api.table.plan.PlanTranslator$$anonfun$1.apply(PlanTranslator.scala:99) at org.apache.flink.api.table.plan.PlanTranslator$$anonfun$1.apply(PlanTranslator.scala:99) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.flink.api.table.plan.PlanTranslator.createTable(PlanTranslator.scala:98) at org.apache.flink.api.scala.table.ScalaBatchTranslator.createTable(ScalaBatchTranslator.scala:48) at org.apache.flink.api.scala.table.DataSetConversions.as(DataSetConversions.scala:44) at org.apache.flink.api.scala.table.DataSetConversions.toTable(DataSetConversions.scala:63) at org.apache.flink.api.scala.table.package$.rowDataSet2Table(package.scala:86) at io.ddf.flink.content.DataSetRow2Table.apply(DataSetRow2Table.scala:13) at io.ddf.content.RepresentationHandler.createRepresentation(RepresentationHandler.java:238) at io.ddf.content.RepresentationHandler.get(RepresentationHandler.java:119)