vincenzobaz / spark-scala3

Apache License 2.0
89 stars 15 forks source link

Help with defining Serializer and Deserializer for FiniteDuration #41

Closed joan38 closed 1 year ago

joan38 commented 1 year ago

Hi,

This is more like a request for help than an actual issue. I'm trying to define a Serializer and Deserializer for FiniteDuration as:

  given Serializer[FiniteDuration] with
    def inputType: DataType = ObjectType(classOf[FiniteDuration])
    def serialize(inputObject: Expression): Expression =
      val javaDuration = StaticInvoke(
        DurationConverters.getClass,
        ObjectType(classOf[java.time.Duration]),
        "toJava",
        inputObject :: Nil,
        returnNullable = false
      )
      summon[Serializer[String]].serialize(javaDuration)

  given Deserializer[FiniteDuration] with
    def inputType: DataType = DayTimeIntervalType()
    def deserialize(path: Expression): Expression =
      val javaDuration = summon[Deserializer[java.time.Duration]].deserialize(path)
      StaticInvoke(
        DurationConverters.getClass,
        ObjectType(classOf[FiniteDuration]),
        "toScala",
        javaDuration :: Nil,
        returnNullable = false
      )

However I get the following error:

No applicable constructor/method found for actual parameters "java.time.Duration"; candidates are: "public static org.apache.spark.unsafe.types.UTF8String org.apache.spark.unsafe.types.UTF8String.fromString(java.lang.String)"

Any idea what I missed?

Thanks

joan38 commented 1 year ago

The answer was summon[Serializer[java.time.Duration]] instead of summon[Serializer[String]]:

  given Serializer[FiniteDuration] with
    def inputType: DataType = ObjectType(classOf[FiniteDuration])
    def serialize(inputObject: Expression): Expression =
      val javaDuration = StaticInvoke(
        DurationConverters.getClass,
        ObjectType(classOf[java.time.Duration]),
        "toJava",
        inputObject :: Nil,
        returnNullable = false
      )
      summon[Serializer[java.time.Duration]].serialize(javaDuration)

  given Deserializer[FiniteDuration] with
    def inputType: DataType = DayTimeIntervalType()
    def deserialize(path: Expression): Expression =
      val javaDuration = summon[Deserializer[java.time.Duration]].deserialize(path)
      StaticInvoke(
        DurationConverters.getClass,
        ObjectType(classOf[FiniteDuration]),
        "toScala",
        javaDuration :: Nil,
        returnNullable = false
      )
joan38 commented 1 year ago

But I'm not able to make it work with an enum:

object StreamTrace:
  enum Type:
    case VideoChangeStream, AudioChangeStream, Other
  given Serializer[StreamTrace.Type] with
    def inputType: DataType = ObjectType(classOf[StreamTrace.Type])
    def serialize(inputObject: Expression): Expression =
      val string = Invoke(inputObject, "toString", StringType, returnNullable = false)
      summon[Serializer[String]].serialize(string)

  given Deserializer[StreamTrace.Type] with
    def inputType: DataType = StringType
    def deserialize(path: Expression): Expression =
      val string = summon[Deserializer[String]].deserialize(path)
      StaticInvoke(
        StreamTrace.Type.getClass,
        ObjectType(classOf[StreamTrace.Type]),
        "valueOf",
        string :: Nil,
        returnNullable = false
      )

Error:

No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static org.apache.spark.unsafe.types.UTF8String org.apache.spark.unsafe.types.UTF8String.fromString(java.lang.String)"

Any idea what's wrong?

vincenzobaz commented 1 year ago

I am afraid we have no logic in place to encode/decode enums yet.

This is a simple enum so it could be doable. Have you tried with ordinal and fromOrdinal? (so ints instead of string)? In the code there is also the skeleton of an attempt of decoding java.enum that could be relevant https://github.com/search?q=repo%3Avincenzobaz%2Fspark-scala3%20enum&type=code

vincenzobaz commented 1 year ago

It looks like it is trying to feed a org.apache.spark.unsafe.types.UTF8String to a function accepting a java.lang.String. So another lead would be to find a way to convert the spark string to java string, for example using toString as it is done multiple times in the code of the class https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L384

In this case you need Invoke rather than StaticInvoke

joan38 commented 1 year ago

@vincenzobaz thank you so much, your comment helped a lot!

So the mistake was that the toString invoke returns a ObjectType(classOf[String]) instead of StringType. So the correct code is:

  given Serializer[StreamTrace.Type] with
    def inputType: DataType = ObjectType(classOf[StreamTrace.Type])
    def serialize(inputObject: Expression): Expression =
      val string = Invoke(inputObject, "toString", ObjectType(classOf[String]), returnNullable = false)
      summon[Serializer[String]].serialize(string)

  given Deserializer[StreamTrace.Type] with
    def inputType: DataType = StringType
    def deserialize(path: Expression): Expression =
      val string = summon[Deserializer[String]].deserialize(path)
      StaticInvoke(
        StreamTrace.Type.getClass,
        ObjectType(classOf[StreamTrace.Type]),
        "valueOf",
        string :: Nil,
        returnNullable = false
      )

But I went further and generalized it to any enum!:

  given [E <: Enum: ClassTag]: Serializer[E] with
    def inputType: DataType = ObjectType(summon[ClassTag[E]].runtimeClass)
    def serialize(inputObject: Expression): Expression =
      val string = Invoke(inputObject, "toString", ObjectType(classOf[String]), returnNullable = false)
      summon[Serializer[String]].serialize(string)

  given [E <: Enum: ClassTag]: Deserializer[E] with
    def inputType: DataType = StringType
    def deserialize(path: Expression): Expression =
      val string = summon[Deserializer[String]].deserialize(path)
      StaticInvoke(
        summon[ClassTag[E]].runtimeClass,
        ObjectType(summon[ClassTag[E]].runtimeClass),
        "valueOf",
        string :: Nil,
        returnNullable = false
      )

Thanks

vincenzobaz commented 1 year ago

Very cool!

Would you like to open a PR with this logic? These simple enums can be useful in Spark code (for types as you did) or for categorical columns in dataset and more people could benefit from your discoveries. Let me know if I can help in any way :)

joan38 commented 1 year ago

Sure I will open a PR