flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
62 stars 14 forks source link

Derive type information for scala Map #134

Closed lilyevsky closed 2 months ago

lilyevsky commented 3 months ago

It looks like the deriveTypeInformation macro does not work for Map[K, V] types. It fails with error

magnolia: could not infer serializers.Typeclass for type Map[........

Is this expected? If Map[K, V] is indeed not supported currently, are there any plans do add such support?

lilyevsky commented 3 months ago

Obviously I can use TypeInformation.of(classOf[Map[K, V]]) instead of deriveTypeInformation , but then it comes back as GenericType without any type information for key or value. I guess, in this case the serialization will be not as efficient as it would be with the official scala API which can in fact derive such information.

Is there any workaround?

novakov-alexey commented 3 months ago

Could you show small code example which raises that Magnolia error?

I am not sure about pure Map[K,V] support as type T for deriveTypeInformation macro. Maybe it is a bug.

Can you try to infer TypeInformation via implicitly? For example:

import org.apache.flinkx.api.serializers._ 

case class SimpleClass1(a: String, b: Int)

implicitly[TypeInformation[Map[String, SimpleClass1]]]
lilyevsky commented 2 months ago

Thanks Alexey,

Using of 'implicitly' indeed eliminated that error.

However, now I am facing more issues getting TypeInformation for the types I need.

In our code we deal with Avro objects; the object type that needs to be serialized is a case class that has a few avro fields. I need to get TypeInformation for those avro classes and have them as implicit values, so the system will pick them up when making serializers for the containing class.

Here is an example. I have a class 'com.liquidnet.sea.order.analytics.avro.CalcIssue', which is a java class created by Avro compiler. If I do

implicitly[TypeInformation[CalcIssue]]

I get the following error:

could not find implicit value for parameter e: org.apache.flink.api.common.typeinfo.TypeInformation[com.liquidnet.sea.order.analytics.avro.CalcIssue]

Would you have any examples of how to deal with Avro in flink-extended?

FYI: currently we use the official flink scala api, and it works fine there, with createTypeInformation macro. I wanted to experiment with flink-extended because flink scala support is deprecated, and also we may need to move on to newer scala versions in the future. It would be great if I could make flink-extended work for our projects.

novakov-alexey commented 2 months ago

Ok, thanks for confirming.

As for the Avro generated Java class, it is a bit more complicated. Since it is not a product type nor case class, flink-extdended will fail to derive implicit TypeInformation.

Better way would be to compile Avro code to Scala case classes. Then it should work (need to check to be sure).

See this project as an example: https://github.com/novakov-alexey/flink-savepoint-scala212/ Here is the compiler trigger: https://github.com/novakov-alexey/flink-savepoint-scala212/blob/main/build.sbt#L52

This generates Scala case class similar to the following.

Input:

{
    "type": "record",
    "namespace": "com.github.novakovalexey",
    "name": "Event",
    "fields": [
        {
            "name": "number",
            "type": "int"
        },
        {
            "name": "count",
            "type": "int",
            "default": 1
        }
    ]
}

Generated output:

import scala.annotation.switch

final case class Event(var number: Int, var count: Int = 1) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, 1)
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case 0 => {
        number
      }.asInstanceOf[AnyRef]
      case 1 => {
        count
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case 0 => this.number = {
        value
      }.asInstanceOf[Int]
      case 1 => this.count = {
        value
      }.asInstanceOf[Int]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = com.github.novakovalexey.Event.SCHEMA$
}

object Event {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"com.github.novakovalexey\",\"fields\":[{\"name\":\"number\",\"type\":\"int\"},{\"name\":\"count\",\"type\":\"int\",\"default\":1}]}")
}

Although that project is for official scala-api, I think it should work with flink-extended as well. It would be great if you could check it.

lilyevsky commented 2 months ago

Thanks, maybe I will look at this later. I understand that flink-extended will work much better with real scala classes. Our problem is, we have a big project that relies on flink kafka support. That includes special kafka sources and sinks, support for schema registry, etc. Naturally, such kafka sources produce Java Avro classes. If we want to have Scala Avro classes instead, we either need some other library or create our own serializers and such. Just a lot of work at this point.