findify / flink-adt

Scala ADT support for Apache Flink
MIT License
51 stars 9 forks source link

Are Scala Enumeration types supported? #28

Open jcho7022 opened 2 years ago

jcho7022 commented 2 years ago

Hi, I'm testing out this library, and it seems that scala Enumeration types aren't supported in this library.

Test code:

class AdtSerializerTest {
  lazy val cluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(1).setNumberTaskManagers(1).build()
  )

  lazy val env: StreamExecutionEnvironment = {
    cluster.getTestEnvironment.setAsContext()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    env.enableCheckpointing(1000)
    env.setRestartStrategy(RestartStrategies.noRestart())
    env.getConfig.disableGenericTypes()
    env
  }

  @BeforeEach
  def beforeEach(): Unit = {
    cluster.before()
  }

  @AfterEach
  def afterEach(): Unit = {
    cluster.after()
  }

  @Test
  def serializerTest(): Unit = {
    val result = env
      .fromScalaCollection(
        List(A("abc", 0.2022f, SomeEnum.BAR), B(-22, SomeCaseClass(-0.2022f, SomeEnum.FOO)))
      ).executeAndCollect(10)

    assertEquals(2, result.size)
  }
}

object AdtSerializerTestAssets {
  object SomeEnum extends Enumeration {
    type SomeEnum = Value

    val FOO: SomeEnum = Value("FOO")
    val BAR: SomeEnum = Value("BAR")
  }

  sealed trait Event extends Product with Serializable
  object Event{
    case class A(someStr: String, someFloat: Float, someEnum: SomeEnum) extends Event
    case class B(someLong: Long, someCaseClass: SomeCaseClass) extends Event

    case class SomeCaseClass(someFloat: Float, someEnum: SomeEnum)

    implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation
  }

  implicit final class FlinkEnvOps(private val env: StreamExecutionEnvironment) extends AnyVal {
    import scala.collection.JavaConverters._

    def fromScalaCollection[A](data: Seq[A])(implicit typeInformation: TypeInformation[A]): DataStreamSource[A] =
      env.fromCollection(data.asJava, typeInformation)
  }
}

Output:

[Error].../AdtSerializerTest.scala:49: could not find implicit value for parameter typeInformation: org.apache.flink.api.common.typeinfo.TypeInformation[...AdtSerializerTestAssets.Event with Serializable]
[Error]...misc/AdtSerializerTest.scala:71: magnolia: could not find TypeInformation.Typeclass for type ....AdtSerializerTestAssets.SomeEnum.SomeEnum
    in parameter 'someEnum' of product type....AdtSerializerTestAssets.Event.A
    in coproduct type ...AdtSerializerTestAssets.Event

Thank you.

erwan commented 1 year ago

Scala 3 enums can be created as compatibly with Java enums, and those work out of the box in Flink:

https://docs.scala-lang.org/scala3/reference/enums/enums.html#compatibility-with-java-enums

Edit: I tried and Scala 3 enums don't work out of the box in Flink, it seems like the Java enum compat is not enough. However it looks like this lib has support for Scala 3 enums?

mzuehlke commented 1 year ago

There is a test showing support for Scala enums (even without the Java compatibility): https://github.com/findify/flink-adt/blob/8926841b826fbdb0512166e31a75ecd1ee60811a/src/test/scala-3/io/findify/flinkadt/Scala3EnumTest.scala

I just guess the "old" Scala 2 Enumeration are not supported ?

Or ist just the TypeInformation for SomeEnum missing ?