flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
70 stars 15 forks source link

Fix case class serializer snapshot compatibility #102

Closed IgnasD closed 8 months ago

IgnasD commented 8 months ago

In Flink session-mode clusters, resolveSchemaCompatibility returns INCOMPATIBLE even for identical case classes (even without any recompiles or reuploads to the cluster). This is due to way Flink state restore works, resulting in the same class being loaded multiple times in different classloaders.

Steps to reproduce:

  1. Start Flink cluster in session-mode.
  2. Start some job with single operator that saves case class in the state.
  3. Wait for the checkpoint.
  4. Kill the job.
  5. Restart the job from the checkpoint - Flink fails to init the operator with the case class in the state.

To mitigate the issue, I've changed the outer check from direct class equality, which in reality is just reference equality, to class name comparison.

novakov-alexey commented 8 months ago

@IgnasD awesome! Thanks a lot.

Flink is using at least two class loaders to my knowledge: system classloader to load Flink itself and user classloader to load any user JARs as part of his Flink job. I guess the issue happens in session cluster mode, because a scala case class from a checkpoint is loaded by parent class loader, but the same class is already loaded by the user classloader. I think that class equality check is indeed not proper way to check whether classes are compatible. Perhaps proper way to check that is to use Scala own tooling (scala-reflect, etc.). Anyway, I am fine to apply this change to make it work.