findify / flink-adt

Scala ADT support for Apache Flink
MIT License
51 stars 9 forks source link

error serializing generic case class #9

Closed oker1 closed 3 years ago

oker1 commented 3 years ago

The previous custom implementation (0.3.x) can serialize this both with and without ClassTag. When upgrading from 0.3.x I've encountered a weird issue where flink uses a serializer with a different type parameter than what would be appropriate for a part of the pipeline. I've tried specifying the TypeInfos explicitly for every step of the pipeline, but it stil used a wrong one. I've tried fixing it by adding the ClassTag, and now it uses the correct seriliazer, but it seems like the implicit parameter breaks calling the constructor during deserialization.

A repro case:

  case class Generic[T: ClassTag](a: T, b: ADT)

  it should "serialize generic case class" in {
    val ser = implicitly[TypeInformation[Generic[SimpleOption]]].createSerializer(null)
    all(ser, Generic(SimpleOption(None), Bar(0)))
  }
oker1 commented 3 years ago

Managed to narrow this down further, the problem is not related to flink/classtags:


case class Generic[T](a: T, b: ADT)

  it should "serialize generic case class" in {
    val ser  = implicitly[TypeInformation[Generic[SimpleOption]]].createSerializer(null)
    val ser2 = implicitly[TypeInformation[Generic[Simple]]].createSerializer(null)
    all(ser, Generic(SimpleOption(None), Bar(0)))
    all(ser2, Generic(Simple(0, "asd"), Bar(0)))
  }

The second serializer fails, because it's identical to the first one. I think it is caused by the caching in the derivation.

shuttie commented 3 years ago

Should be fixed in 0.4.3