vincenzobaz / spark-scala3

Apache License 2.0
87 stars 15 forks source link

.collect() not entirely functional when using Scala 3.3.1 #40

Open Anghammar opened 11 months ago

Anghammar commented 11 months ago

I tested to upgrade this project to use Scala 3.3.1 but encountered an issue. It seems that using .collect on a dataset containing some Scala case class causes some generated code to fail to compile. The odd thing is that this only happens if you are running from a main def (either a def with @main or the usual main(args: Array[String]): Unit) but not when running from an object that extends App.

This can be reproduce in the example code StarWars.scala by first changing the Scala version to 3.3.1 in build.sbt, then adding a main method to the StarWars object and delete "extends App". Now if you add .collect on any dataset in the file and run it it should crash with this or a similar error:

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 60, Column 8: No applicable constructor/method found for actual parameters "java.lang.String, java.lang.String"; candidates are: "sql.StarWars$Friends$1(sql.StarWars$, java.lang.String, java.lang.String)"

vincenzobaz commented 11 months ago

Just to be clear, with 3.3.0 the compiling issue within @main does not occur?

Anghammar commented 11 months ago

Just to be clear, with 3.3.0 the compiling issue within @main does not occur?

No not when I tested it.

vincenzobaz commented 11 months ago

Interesting! I wonder if this is due to something different in Scala 3.3.1

michael72 commented 11 months ago

I get different exceptions - and it has nothing to do with Scala 3.3.0 or 3.3.1 - rather with the collect and the StarWars files

I tried collecting the data of the last Dataset sw_ds in the starwars example and got something like:

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 78, Column 29: No applicable constructor/method found for actual parameters "double"; candidates are: "public static java.lang.Integer java.lang.Integer.valueOf(java.lang.String) throws java.lang.NumberFormatException", "public static java.lang.Integer java.lang.Integer.valueOf(int)", "public static java.lang.Integer java.lang.Integer.valueOf(java.lang.String, int) throws java.lang.NumberFormatException"
[info] org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 78, Column 29: No applicable constructor/method found for actual parameters "double"; candidates are: "public static java.lang.Integer java.lang.Integer.valueOf(java.lang.String) throws java.lang.NumberFormatException", "public static java.lang.Integer java.lang.Integer.valueOf(int)", "public static java.lang.Integer java.lang.Integer.valueOf(java.lang.String, int) throws java.lang.NumberFormatException"

This means that internally the call to java.lang.Integer.valueOf(double) failed - because there is actually no such thing.

The reason for this actually lies in the data and the case class definitions - they don't match. You can look at the inferred scheme from the dataframe by just printing the schema - or a bit better formatted with the following print call:

    println(sw_df.schema.fields.map { f =>
      s"${f.name}: ${f.dataType.toString().replace("Type", "")}"
    }.mkString("\n"))

This yields

name: String
gender: String
height: Double
weight: String
eyecolor: String
haircolor: String
skincolor: String
homeland: String
born: String
died: String
jedi: String
species: String
weapon: String
friends: String

which does not fit the case class definition - probably the other types do not fit as well...

For instance the gender is missing - OK - missing fields should not be a problem, but height is actually a Double and was defined as Int => here we have the parsing error. Also weight is a String, not an Int - the String comes from "NA" being in the data and "inferSchema" does not know about "NA" - same for the other columns where NA is present, should actually be mapped to None, but isn't. Unfortunately it is not possible to derive the schema from the case classes here automatically - maybe that is something that this library should provide? If the schema were derived from the case class we would however see other exceptions when reading the data with that. So - the data should be converted to our own types manually I guess.

The error message is a bit irritating - probably in spark + scala2 it is a bit better.

michael72 commented 11 months ago

btw - deriving the schema from the case class T works this way:

List.empty[T].toDS.schema
Anghammar commented 11 months ago

Ah yes I forgot to mention that I also had notice that the later datasets of case classes was broken either way, since I had modified them in my own example. But with the first Dataset[Friends] one should be able to reproduce the error with no other modification.

Anghammar commented 11 months ago

Or just create a small example with a dataset of a case class.

vincenzobaz commented 11 months ago

Should we close this issue and create a new one for deriving schema of case classes?

Anghammar commented 11 months ago

Should we close this issue and create a new one for deriving schema of case classes?

I created the issue based on how to recreate problem. If there is a reformulation of the same issue that more closely describes what is actually wrong then sure.

vincenzobaz commented 11 months ago

Thank you @michael72 for the in-depth analysis!

I had a look at your arguments and I have a few points to highlight:

For instance the gender is missing - OK

Indeed gender is not a member of the case class, so we cannot expect it to be reported by List.empty[SW].toDS.schema.

Unfortunately it is not possible to derive the schema from the case classes here automatically

This is already the case with Spark in Scala 2.x. For example y.as[X]does not drop columns, from what I understand, it checks if the columns of y are a subset of the columns of X.

I think the library is able to derive a schema from case classes as List.empty[SW].toDS.schema yields the right schema for the class. The issue is with casting or checking existing dataframes as we can see with sw_df.schema or sw_df.as[SW].schema. But I believe that this behavior is inherited from Spark on Scala 2, so I am not surprised.

maybe that is something that this library should provide

I think there are already multiple libraries that focus on type safety for spark jobs such as Frameless and Iskra. I would prefer to focus on providing a simple Scala 3 upgrade path to Spark users to accelerate the transition of the ecosystem (users and libraries) to modern Scala.

michael72 commented 11 months ago

I added a PR which gives better error messages than the current code - so the collect wouldn't work, because the as would fail before that. However I couldn't adapt the StarWars example, because I tried using a map function - and that actually leads to the spark engine just hanging. Similar to when I started implementing the UDFs - which when not defined on package level could not be serialized, then spark would silently fail and hang. I think it is a similar problem, but as long as we don't understand why the serialization fails here or if it would be possible to serialize it using scala3 - I don't think a proper implementation of map would be feasible. I tried a few different things - also defining the map function on package level but with no results.

However it was possible to do the data conversion using a udf - but that was kind of difficult. I will past the code here:

on package level:

import scala3udf.{
  // "old" udf doesn't interfer with new scala3udf.udf when renamed
  Udf => udf
}
case class Character(
    name: String,
    height: Double,
    weight: Option[Double],
    eyecolor: Option[String],
    haircolor: Option[String],
    jedi: String,
    species: String
)

def toOption[T](what: Any, parse: Any => T) =
  if (what.toString() == "NA") None else Some(parse(what))

val character = udf(
  (
      name: String,
      height: Double,
      weight: String,
      eyecolor: String,
      haircolor: String,
      jedi: String,
      species: String
  ) =>
    Character(
      name,
      height,
      toOption(weight, _.toString.toDouble),
      toOption(eyecolor, _.toString),
      toOption(haircolor, _.toString),
      jedi,
      species
    )
)

and initialization:

  implicit val spark: SparkSession =
    SparkSession.builder().master("local").getOrCreate
  udf.register(character)

and the use later:

val characters = spark
      .sql(
        "SELECT character(name, height, weight, eyecolor, haircolor, jedi, species)  as character from data"
      )
      .select("character.*")
      .as[Character]

which does the trick. The rest of the code would have to be adapted as well. Using a map function and directly returning Character would be much easier, but the spark application in scala3 then simply hangs.

vincenzobaz commented 11 months ago

as long as we don't understand why the serialization fails here or if it would be possible to serialize it using scala3

The fact you manage to get it working through Option could indicate that the issue could be related to the nullable field... Spark uses it in a very relaxed way. Always any field is nullable unless marked differently. Have you tried setting it always to true for example?

michael72 commented 11 months ago

No, I can't get the map function working (because that is what I would use to get the example running) - and it has nothing to to with Option - small example, that works with Scala 2 (without our import)

import org.apache.spark.sql.SparkSession
import scala3encoders.given

case class A(x: Double)
case class B(x: Int)

object MapSample extends App {
  val spark = SparkSession.builder().master("local").getOrCreate
  import spark.implicits._

  val data = List(A(1.0), A(2.3)).toDF().as[A].map {
    case A(x) => B(x.toInt)
  }
  data.show() // with Scala 3 it simply hangs
}

Maybe I should create another ticket?

vincenzobaz commented 11 months ago

Maybe I should create another ticket? Sure! Let's collect all the info there.

Have you tried lowering the logging level to debug to see if it prints something there while hanging?

michael72 commented 11 months ago

I tried with spark.sparkContext.setLogLevel("DEBUG") but didn't get much more information (except generated code). But I can't see where exactly it hangs. I remember investigating further for udfs that lambdas couldn't be serialized - but not sure where that was now. For udfs it works when defining the lambda on package level - for map it doesn't seem to work that simply