flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
55 stars 14 forks source link

BigDecMaper not serializable #81

Closed buinauskas closed 6 months ago

buinauskas commented 7 months ago

Hey, I've got this (simplified) class and encountered an issue with BigDecMapper not being serializable.

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class Item(
  id: Long,
  price: BigDecimal,
  currency: String,
)

object Item {
  implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}

This compiles well, but when I execute my test suite, this is being thrown:

org.apache.flink.api.common.InvalidProgramException: The implementation of the CaseClassSerializer is not serializable. The object probably contains or references non serializable fields.
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2360)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1997)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1980)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1959)
  at org.apache.flinkx.api.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:618)
  at com.vinted.search.commons.test.fixtures.FlinkTest$TestSourceExtensions.fromList(FlinkTest.scala:84)
  ...
  Cause: java.io.NotSerializableException: org.apache.flinkx.api.mapper.BigDecMapper
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
  at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
  at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
  at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
  at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
  at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
  at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
  at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1369)

I tried marking typeInformation as @transient so that flink would not send it over the wire, but that did not help, how could I solve this?

Thank you 🙏

novakov-alexey commented 6 months ago

Hi @buinauskas

It seems we have forgotten to make BigDecMapper Serializable. Can you quickly check in your code base the following code? If it works fine, then I can make the same change in this library:

import org.apache.flinkx.api.serializer.MappedSerializer.TypeMapper
import java.math.{BigDecimal => JBigDecimal}

 // adding Serializable interface
 class BigDecMapper2 extends TypeMapper[scala.BigDecimal, java.math.BigDecimal] with Serializable {
      override def map(a: BigDecimal): java.math.BigDecimal       = a.bigDecimal
      override def contramap(b: java.math.BigDecimal): BigDecimal = BigDecimal(b)
 }

object Item {
  implicit lazy val bigDecMapper2: TypeMapper[scala.BigDecimal, JBigDecimal] = new BigDecMapper2()
  implicit lazy val bigDecInfo2: TypeInformation[scala.BigDecimal] = mappedTypeInfo[scala.BigDecimal, JBigDecimal]

  implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}
AlexLi-Monster commented 6 months ago

And I think it is missing the java.time.Instant Serializable as well

buinauskas commented 6 months ago

Hey @novakov-alexey sorry for the late response, I was away for the weekend. Making type mappers serializable solves the problem 👍

novakov-alexey commented 6 months ago

Thanks for confirming. We will do that in the next release as well.

buinauskas commented 6 months ago

Anything I could help with? I would like this to be released sooner to avoid implementing type information manually for these types.

novakov-alexey commented 6 months ago

@buinauskas yes, do you want to make a PR ? It will speed up the things

buinauskas commented 6 months ago

@novakov-alexey good, I've submitted a fix.

novakov-alexey commented 6 months ago

released: https://repo1.maven.org/maven2/org/flinkextended/flink-scala-api_2.12/1.18.1_1.1.3/ @buinauskas thank you for your contribution!

buinauskas commented 6 months ago

Excellent, thank you!