ariskk / flink4s

Scala 3.x wrapper for Apache Flink
MIT License
49 stars 10 forks source link

Kryo serializer question #19

Open sami-badawi opened 2 years ago

sami-badawi commented 2 years ago

As far as I understand there is a problem with using the Kryo serializer from Scala for Flink 1.15:

https://flink.apache.org/2022/02/22/scala-free.html

https://www.mail-archive.com/user@flink.apache.org/msg45263.html

I am looking in the flink4s and I am not seen any references to Kryo serializer. I see 2 uses of asJava:

      override def asyncInvoke(in: T, result: ResultFuture[R]) =
        f(in).asJava.whenComplete { (response, error) =>
  def fromCollection[T](data: Seq[T])(using typeInfo: TypeInformation[T]): DataStream[T] =
    DataStream(javaEnv.fromCollection(data.asJava, typeInfo))

Is that taking care of this problem or do you need to enhance the serializer to get it to work?

ariskk commented 2 years ago

The Scala specific serializers were created by the createTypeInfo macro which has been a no-go in large projects due to very very slow compilation times. There is a performance penalty in this, but it is negligible for many applications. This project is using the Java serializers directly (ie implicit val typeInfo = TypeInformation.of(classOf[SomeType]) and I haven't noticed any issues so far. Please give it a go and let me know if you find any runtime issues!

sami-badawi commented 2 years ago

This is interesting, I would use the Java serializers without hesitation. The unit test runs fine on my M1 Mac, so maybe that is fine.

I did try to setup a small sample project based on the README.md but I am not sure what imports to use. Here is what I currently have:

val flinkLibs = Seq(
  "com.ariskk" %% "flink4s" % "1.15.0",
  "org.apache.flink" % "flink-streaming-java" % flinkVersion,
  "org.apache.flink" % "flink-core" % flinkVersion
)

But I am getting this error:

[error] java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
[error]     at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
ariskk commented 2 years ago

Hmm, that's odd. I am currently doing the work to publish the library for 2.13, I will look into it as part of that.