NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
807 stars 234 forks source link

[FEA] Support data type org.apache.spark.mllib.linalg.VectorUDT #4820

Open viadea opened 2 years ago

viadea commented 2 years ago

I wish we can support data type org.apache.spark.mllib.linalg.VectorUDT.

Mini repro:

import org.apache.spark.sql.types._

val rows = spark.sparkContext.parallelize(
  List(
    Row(0.0, 1.2, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0))
  )
)

val schema = List(
  StructField("label", DoubleType, true),
  StructField("feature", DoubleType, true),
  StructField("a_vector", new org.apache.spark.mllib.linalg.VectorUDT, true)
)

val df = spark.createDataFrame(
  rows,
  StructType(schema)
)
df.write.mode("overwrite").format("parquet").save("/tmp/testparquet")
val df2=spark.read.parquet("/tmp/testparquet")
df2.createOrReplaceTempView("df2")

spark.sql("select * from df2").collect()

Unsupported messages:

!Exec <FileSourceScanExec> cannot run on GPU because unsupported data types in output: org.apache.spark.mllib.linalg.VectorUDT@f71b0bce [a_vector#93]; unsupported data types org.apache.spark.mllib.linalg.VectorUDT@f71b0bce [a_vector] in read for Parquet
revans2 commented 2 years ago

We can start to put in some support for UDTs, but a UDT is a java class that provides ways to translate to/from other standard SQL types. We can read in the standard SQL types but then it is going to take some work to understand exactly when/where the translations to/from the java type happen in Spark and making sure we can plumb all of that through. That is the main reason we have not added any support for UDTs yet.

How is this used? typically someone will add in support for a UDT because they want to interact with this as a java class instead of as a SQL struct. Which means we are not likely going to be able to do much with this once it is read in except send it to the CPU for more processing.

wjxiz1992 commented 2 years ago

It should be a use case from ML side. Currently our 2 supported ML case are XGBoost and PCA, both of them are using VectorUDT in their CPU version. The entry point of this could be VectorAssember(merge multiple columns into one) or customized UDF(cast ArrayType to VectorUDT) like:

import org.apache.spark.ml.linalg.Vectors
val convertToVector = udf((array: Seq[Double]) => {
  Vectors.dense(array.map(_.toDouble).toArray)
})

For the first case, XGBoost added support for multiple columns as input. For the second case, PCA also support ArrayType column directly.

revans2 commented 2 years ago

Do you have an example full workflow/query that you want to have optimized? Adding in support for UDTs is possibly a lot of work and it would be nice to know what areas we should concentrate on first. Looking at VectorUDT there are two implementations. One for sparse and another for dense vectors. Each line could be one or the other depending on the data in it. So reading out that data from parquet should not be too difficult, but how is it going to be used? Are we going to have to support user defined functions that take user defined types? I am just concerned that this is the first layer of an onion and we can add in what you are asking, but I don't think it is going to help in terms of performance until we do a lot of follow on work too.

viadea commented 2 years ago

@revans2 I could share more details offline. Here are some of the needed operators on vector type based on logs:

  1. input for CollectLimitExec
  2. output for ProjectExec
  3. AttributeReference produces vector
  4. input&output for ScalaUDF UDF
  5. reading/writing vector to/from parquet
revans2 commented 2 years ago

All of those except CollectLimitExec look to be doable. CollectLimitExec we do not currently support because of a performance optimization in Spark that we just have not felt the need to support. If you have a real use case that is not just show, then we should talk about an issue to add in support for that generally too.