eubnara / study

6 stars 2 forks source link

spark udf Serializable 유의사항 #220

Open eubnara opened 3 years ago

eubnara commented 3 years ago

https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/

https://stackoverflow.com/questions/36794688/spark-task-not-serializable-for-udf-on-dataframe

eubnara commented 3 years ago

https://spark.apache.org/docs/2.3.2/sql-programming-guide.html#data-types https://spark.apache.org/docs/latest/sql-ref-datatypes.html

string 으로된 array => Seq[String] struct 로 된 array => Seq[Row]

https://stackoverflow.com/questions/49671354/how-to-sort-array-of-struct-type-in-spark-dataframe-by-particular-column

  case class ABC(a: Int, b: Int, c: Int)

  val first = Seq(
    ABC(1, 2, 3),
    ABC(1, 3, 4),
    ABC(2, 4, 5),
    ABC(2, 5, 6)
  ).toDF("a", "b", "c")

  val second = Seq(
    (1, 2, (Date.valueOf("2018-01-02"), 30)),
    (1, 3, (Date.valueOf("2018-01-01"), 20)),
    (2, 4, (Date.valueOf("2018-01-02"), 50)),
    (2, 5, (Date.valueOf("2018-01-01"), 60))
  ).toDF("a", "b", "c")

val sortUdf = udf { (xs: Seq[Row]) => xs.sortBy(_.getAs[Int](1) )
                                        .map{ case Row(x:java.sql.Date, y: Int) => (x,y) }}

first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
     .groupBy("a")
     .agg(sortUdf(collect_list("c2")))
     .show(false)

//+---+----------------------------------+
//|a  |UDF(collect_list(c2, 0, 0))       |
//+---+----------------------------------+
//|1  |[[2018-01-01,20], [2018-01-02,30]]|
//|2  |[[2018-01-02,50], [2018-01-01,60]]|
//+---+----------------------------------+