findify / flink-adt

Scala ADT support for Apache Flink
MIT License
51 stars 9 forks source link

Serializing circe.Json in Flink #26

Open sami-badawi opened 2 years ago

sami-badawi commented 2 years ago

I have problems using Flink-ADT to serialize json in circe.Json format.

Approach 1

  implicit val jsonInfo: TypeInformation[Json] =
    deriveTypeInformation[Json]

Here is the error message:

magnolia: child class BiggerDecimalJsonNumber of class JsonNumber is neither final nor a case class

Approach 2

  implicit val jsonInfo: TypeInformation[Json] =
    TypeInformation.of(classOf[Json])

It compiles but crashes when I run unit test.

Approach 3

Based on the MappedTypeInfoTest.scala unit test. I created my own version

object CirceMappedTypeInfoTest {
    class WrappedMapper extends TypeMapper[Json, String] {
    override def map(json: Json): String = json.noSpaces

    override def contramap(jsonString: String): Json = {
      val res = parse(jsonString)
      res match {
        case Left(value) => Json.Null
        case Right(value) => value
      }
    }
  }
  implicit val mapper: TypeMapper[Json, String] = new WrappedMapper()
}

class CirceMappedTypeInfoTest extends AnyFlatSpec with Matchers with TestUtils {
  import CirceMappedTypeInfoTest._
  it should "derive TI for non-serializeable classes" in {

    drop(implicitly[TypeInformation[Json]])
    val ti = implicitly[TypeInformation[Json]]
    val ser = ti.createSerializer(null)
    assert(ser != null)
}

And it failed with this error message:

[info] - should derive TI for non-serializeable classes *** FAILED ***
[info]   java.lang.NullPointerException:
[info]   at org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton.equals(TypeSerializerSingleton.java:43)
[info]   at io.findify.flinkadt.api.serializer.MappedSerializer.equals(MappedSerializer.scala:17)
[info]   at io.findify.flinkadt.CirceMappedTypeInfoTest.$anonfun$new$1(CirceMappedTypeInfoTest.scala:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)

Question

I would think that circe.Json format is very common among flink-adt users.

Am I just using flink-adt badly with circe.Json, or is this a hard class to serialize?

sami-badawi commented 2 years ago

I tried to debug into the exception at MappedSerializer.scala:17 and change code from:

Failing

case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[B]) extends SimpleSerializer[A] {
  override def equals(obj: Any): Boolean = ser.equals(obj)

To:

Not failing

case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[B]) extends SimpleSerializer[A] {
  override def equals(obj: Any): Boolean = if (obj == null) false else ser.equals(obj)
NickBurkard commented 2 years ago

This should be easily fixable by updating the above method to use ==, which inherently checks for null before checking value equality. I'll cut a PR to address this issue & ensure circe JSON is supported @sami-badawi.