flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
62 stars 14 forks source link

Case class evolution: ArrayIndexOutOfBoundsException #80

Closed AlexLi-Monster closed 6 months ago

AlexLi-Monster commented 8 months ago

First of all, thank you for bringing us this library to rescue me of the schema/state migration of Flink.

I have a question about the version compatible with Flink. I notice the oldest version of this lib is 1.15.4_1.0.0, which means it needs at least Flink 1.15. But I tested it with the following:

ThisBuild / version := "0.0.1"
ThisBuild / scalaVersion := "2.12.17"

val flinkVersion = "1.13.2"

val flinkDependencies = Seq(
  //  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  //  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  ("org.apache.flink" %% "flink-clients" % flinkVersion)
    .exclude("org.apache.flink", "flink-runtime_2.12")
    .exclude("org.apache.flink", "flink-streaming-java_2.12"),
  "org.flinkextended" %% "flink-scala-api" % "1.15.4_1.1.1"
)

it works fine with the simple case class schema evolution, like adding fields, or changing field types, although it is Flink 1.13.2 (Align with AWS), I would like to confirm if there are any potential issues with mixing this lib 1.15.4_1.0.0 with Flink 1.13.2? Currently we can do re-bootstrap the job in prod, but try to avoid re-bootstrap once is deployed fully in prod and using state migration with snapshot in AWS.

Also, is there any example of how to use this library for: can be extended with custom serializers even for deeply-nested types, as it uses implicitly available serializers in the current scope

Thanks a lot in advance.

novakov-alexey commented 8 months ago

Hi @AlexLi-Monster .

You are welcome. This library version only means that we compile it with specific Flink version before publishing it to Maven Central. Pretty much it is a check that Flink public API is there and it can be used by this library.

If it works with your Flink cluster 1.13.2 then it may work further. Since this library is not using Flink's official Scala API, which is still in the Apache project, then the probability of potential issues which may come when you migrate your state from 1.13.x to 1.18.x to new Flink version is minimal. It will all depend on the quality of Flink API itself. This library is quite static in terms of future changes.

As for example for deeply-nested types, do you have any particular issue or case to have a look?

AlexLi-Monster commented 8 months ago

Thanks @novakov-alexey , I do have a gist to show what I am trying to do. this is with Flink 1.15: https://gist.github.com/AlexLi-Monster/036b12cc4b23732485fb027fc74e4112

In short, I borrow the code from a book, and this app uses SensorSource to generate a DataStream[SensorReading], then, data is processed by a KeyedProcess function KeyedProcessFunction[String, SensorReading, Output].

What I am trying to do is use flink-scala-api to evolve the case class SensorReading. but it seems doesn't work. Steps:

  1. Running locally with the fat JAR, where the case class SensorReading doesn't have field location
  2. App is running fine in Flink and has proper output in stdout, create a savepoint here.
  3. Add a field location in case class SensorReading with a default value.
  4. Stop the job and run the job with new JAR and with the savepoint
  5. Seeing exceptions like:
    Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 10 more
    Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 12 more
    Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out of bounds for length 3
    at scala.collection.mutable.WrappedArray$ofRef.apply(WrappedArray.scala:129)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror4.jinvokeraw(JavaMirrors.scala:407)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:342)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:358)
    at org.apache.flinkx.api.serializer.ConstructorCompat.$anonfun$lookupConstructor$3(ConstructorCompat.scala:39)
    at org.apache.flinkx.api.serializer.ScalaCaseClassSerializer.createInstance(ScalaCaseClassSerializer.scala:46)
    at org.apache.flinkx.api.serializer.ScalaCaseClassSerializer.createInstance(ScalaCaseClassSerializer.scala:29)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.deserialize(CaseClassSerializer.scala:124)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174)
    ... 19 more

Not sure what I have done incorrectly, I thought using the lib to serialise case class should allow the state evolution by:

object SensorReading {
  implicit lazy val sensorReadingInfo: TypeInformation[SensorReading] = deriveTypeInformation
}

Evolving on the case class Output does not have issues though. Could you help to point out what I am missing here? Thanks a lot in advance!

novakov-alexey commented 8 months ago

@AlexLi-Monster I thought it should be supported as well. Maybe @shuttie knows for sure whether this case is supposed to work?

It seems the issue is somewhere in Scala2 reflection library. Although the 4th constructor has default value, Scala2 reflection fails to handle 4 args constructor with only 3 arg values available from the savepoint state.

novakov-alexey commented 8 months ago

It seems we select wrong constructor here: https://github.com/flink-extended/flink-scala-api/blob/eb74983b9fd6bd009f0283f8896de4bd81edb3d0/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala#L11

AlexLi-Monster commented 8 months ago

hi @novakov-alexey thanks for taking care of this, would that be a beta fix version I can test out for now? or we need to confirm how this works with @shuttie

Thanks!

novakov-alexey commented 8 months ago

Hi @AlexLi-Monster , we need to confirm how it is supposed to work with @shuttie. Another option you can try to dive into Scala2 reflection library yourself to see how to instantiate a case class based on the array of arguments which is smaller than constructor arguments size.

novakov-alexey commented 8 months ago

It seems I have found a way how to support default arguments in case classes upon de/serialization with Scala 2. Scala 3 will require some research.

AlexLi-Monster commented 7 months ago

It seems I have found a way how to support default arguments in case classes upon de/serialization with Scala 2. Scala 3 will require some research.

Hi @novakov-alexey how did you fix it in Scala2? I didn't manage to do it due to my limited Scala knowledge, would like to try out your fix by building a local lib.

novakov-alexey commented 7 months ago

@AlexLi-Monster can you try to build local version from this PR https://github.com/flink-extended/flink-scala-api/pull/98 and test your case? You can also try to add more test cases into the SchemaEvolutionTest test to capture any other useful examples of schema evolution.

AlexLi-Monster commented 7 months ago

@AlexLi-Monster can you try to build local version from this PR #98 and test your case? You can also try to add more test cases into the SchemaEvolutionTest test to capture any other useful examples of schema evolution.

Sorry, I might need more time to verify this. quite busy with the current project dealing with savepoint from the Flink Kyro serialisation. We would like to use this lib for serialisation, but we want to migrate the savepoint rather than restore without snapshot :/

AlexLi-Monster commented 7 months ago

Hi @novakov-alexey , when I updated the jar using the branch version, I have the following error that it cannot find the

implicit m: _root_.scala.deriving.Mirror.Of[T]

didn't happen with the master branch, do I need to add sth. here? I am using

ThisBuild / scalaVersion := "2.12.17"
val flinkVersion = "1.15.2"
Screenshot 2024-03-08 at 9 01 10 am
AlexLi-Monster commented 7 months ago

Ah, I think I compiled the lib JAR using Scala3, so once I switch to Scala2. It compiles now. However, when I use the lib to run sbt clean assembly as before, it throws some exception like the following:

java.util.NoSuchElementException: head of empty list
scala.collection.immutable.Nil$.head(List.scala:469)
scala.collection.immutable.Nil$.head(List.scala:466)
scala.tools.nsc.typechecker.ContextErrors$InferencerContextErrors$InferErrorGen$.NotWithinBoundsErrorMessage(ContextErrors.scala:1064)
scala.tools.nsc.typechecker.ContextErrors$InferencerContextErrors$InferErrorGen$.NotWithinBounds(ContextErrors.scala:1071)
scala.tools.nsc.typechecker.Infer$Inferencer.issueBoundsError$1(Infer.scala:877)
scala.tools.nsc.typechecker.Infer$Inferencer.check$1(Infer.scala:883)
scala.tools.nsc.typechecker.Infer$Inferencer.checkBounds(Infer.scala:887)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.scala$tools$nsc$typechecker$RefChecks$RefCheckTransformer$$checkBounds(RefChecks.scala:1268)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transform(RefChecks.scala:1858)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transform(RefChecks.scala:114)
scala.reflect.internal.Trees.$anonfun$itransform$1(Trees.scala:1421)
scala.reflect.api.Trees$Transformer.atOwner(Trees.scala:2608)
scala.reflect.internal.Trees.itransform(Trees.scala:1420)
scala.reflect.internal.Trees.itransform$(Trees.scala:1400)
scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transform(RefChecks.scala:1953)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transformStat(RefChecks.scala:1251)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.$anonfun$transformStats$1(RefChecks.scala:1235)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transformStats(RefChecks.scala:1233)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transformStats(RefChecks.scala:114)
scala.reflect.internal.Trees.itransform(Trees.scala:1468)
scala.reflect.internal.Trees.itransform$(Trees.scala:1400)
scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transform(RefChecks.scala:1953)
scala.tools.nsc.typechecker.RefChecks$RefCheckTransformer.transform(RefChecks.scala:114)
scala.reflect.api.Trees$Transformer.transformTemplate(Trees.scala:2571)
scala.reflect.internal.Trees.$anonfun$itransform$5(Trees.scala:1477)

I suspect still some Scala version incompatible issues here, I am using:

ThisBuild / scalaVersion := "2.12.19"
val flinkVersion = "1.15.2"

//  "org.flinkextended" %% "flink-scala-api" % "1.15.4_1.1.1",
  "org.flinkextended" % "flink-scala-api" % "1.1.4-SNAPSHOT" from "file://path-to-my-project/flink-scala-api_2.12-1.1.4-SNAPSHOT.jar",

It is fine running with the version 1.15.4_1.1.1 and I build the branch jar using command: sbt clean package then I get the jar and reference as above.

AlexLi-Monster commented 7 months ago

But in terms of the fix, I think it works as expected in the test (just cannot verify with Flink locally), I also tested sth. like have the list of objects in the original form (this is the major issue when we are using POJO) and try out in pure Case Class. works perfect!

class SchemaEvolutionTest extends AnyFlatSpec with Matchers {
  private val ti = deriveTypeInformation[Click]
  private val clicks = List(ClickEvent("a", "2021-01-01"), ClickEvent("b", "2021-01-01"), ClickEvent("c", "2021-01-01"))

  ignore should "generate blob for event=click+purchase" in {
    val buffer          = new ByteArrayOutputStream()
    val eventSerializer = ti.createSerializer(null)

    eventSerializer.serialize(Click("p1", clicks), new DataOutputViewStreamWrapper(buffer))
    Files.write(Path.of("src/test/resources/click.dat"), buffer.toByteArray)
  }

  it should "decode click when we added view" in {
    val buffer = this.getClass.getResourceAsStream("/click.dat")
    val click  = ti.createSerializer(null).deserialize(new DataInputViewStreamWrapper(buffer))
    click shouldBe Click(id = "p1", clicks)
  }
}

object SchemaEvolutionTest {
//  sealed trait Event
//  case class Click(id: String, notInDatFile: String = "", active: Boolean = false) extends Event
//  case class Purchase(price: Double)                      extends Event
//  case class View(ts: Long)                               extends Event
  case class Click(id: String, inFileClicks: List[ClickEvent], notInFileActive: Boolean = true, notInFileString: String = "")
  case class ClickEvent(sessionId: String, date: String)
}
AlexLi-Monster commented 7 months ago

Ah, I found an interesting failing case that happened in POJO fails in here as well: Say I have a List of ClickEvent in the Click class and I want to evolve the schema of ClickEvent, it will shift the fields when derserilising like below:

class SchemaEvolutionTest extends AnyFlatSpec with Matchers {
  private val ti = deriveTypeInformation[Click]
  private val clicks = List(ClickEvent("a", "2021-01-01"), ClickEvent("b", "2021-01-01"), ClickEvent("c", "2021-01-01"))

  ignore should "generate blob for event=click+purchase" in {
    val buffer          = new ByteArrayOutputStream()
    val eventSerializer = ti.createSerializer(null)

    eventSerializer.serialize(Click("p1", clicks), new DataOutputViewStreamWrapper(buffer))
    Files.write(Path.of("src/test/resources/click.dat"), buffer.toByteArray)
  }

  it should "decode click when we added view" in {
    val buffer = this.getClass.getResourceAsStream("/click.dat")
    val click  = ti.createSerializer(null).deserialize(new DataInputViewStreamWrapper(buffer))
    println(click)
    val evolvedClicks = List(ClickEvent("a", "2021-01-01", ""), ClickEvent("b", "2021-01-01", ""), ClickEvent("c", "2021-01-01", ""))
    click shouldBe Click(id = "p1", evolvedClicks)
  }
}

object SchemaEvolutionTest {
//  sealed trait Event
//  case class Click(id: String, notInDatFile: String = "", active: Boolean = false) extends Event
//  case class Purchase(price: Double)                      extends Event
//  case class View(ts: Long)                               extends Event
  case class Click(id: String, inFileClicks: List[ClickEvent])
  case class ClickEvent(sessionId: String, date: String, fieldNotInFile: String = "")
}

the id of clickEvent b will be shift to the clickEventA's new field.

Expected :Click("p1", List(ClickEvent("a", "2021-01-01", ""), ClickEvent("b", "2021-01-01", ""), ClickEvent("c", "2021-01-01", "")))
Actual   :Click("p1", List(ClickEvent("a", "2021-01-01", "b"), ClickEvent("2021-01-01", "c", "2021-01-01"), ClickEvent("2021-01-01", "c", "2021-01-01")))

I guess we should not evolve on the class that is in a List 🤔

novakov-alexey commented 6 months ago

@AlexLi-Monster I only see one solution to make it work is to add an existing class constructor arity number to a savepoint during the serialization process. This way we know how many arguments should be read upon deserialization. I have added new commit the the WIP branch based on your test case. Downside of this solution is that with a new version of this library, a user can't restore Flink state from savepoints created by previous library version. Because that args arity (int number) number is not going to be present in older savepoints.

This covers Scala 2.x only.

AlexLi-Monster commented 6 months ago

@novakov-alexey ,

Tested the fix in the PR#98 looks good. this issue can be closed now.

Thanks, Alex

AlexLi-Monster commented 6 months ago

Also as a side question need your suggestion, since our Flink is 1.13 and not able to use this library now:

Wondering what is the best practise to define serialiser for scala.Option or other collection type like List or Map in Flink 1.13. I have a POJO in Scala2:

class Building(
  var buildTime: Option[Instant],
  var managers: List[String]
)

when I disable Kryo for generic type serialisation, I will get the error for the POJO above Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type

I guess need to define own custom serialiser for these class, but failed to find good example for it, is the code org.apache.flinkx.api.serializer.OptionSerializer from this library I can refer to?

novakov-alexey commented 6 months ago

@novakov-alexey ,

Tested the fix in the PR#98 looks good. this issue can be closed now.

Thanks, Alex

Thank you for testing. I have not found a solution for Scala 3 yet, I think we can limit the scope of this issue to Scala 2 and create new issue for Scala 3 to track it separately. The challenge with Scala 3 is that its reflection API is completely different than it is in Scala 2.

TODO: exclude SchemaEvolutionTest from Scala 3 build for now.

novakov-alexey commented 6 months ago

Also as a side question need your suggestion, since our Flink is 1.13 and not able to use this library now:

Wondering what is the best practise to define serialiser for scala.Option or other collection type like List or Map in Flink 1.13. I have a POJO in Scala2:

class Building(
  var buildTime: Option[Instant],
  var managers: List[String]
)

when I disable Kryo for generic type serialisation, I will get the error for the POJO above Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type

I guess need to define own custom serialiser for these class, but failed to find good example for it, is the code org.apache.flinkx.api.serializer.OptionSerializer from this library I can refer to?

Yeah, Kryo comes into play when Flink sees a type which it cannot find a TypeInformation instance for. So you need to bring missing serializer yourself. There is no best practices to implement own serializers, just take Flink API and implement abstract methods. This library is a copy of original Flink Scala API. You can copy paste org.apache.flinkx.api.serializer.OptionSerializer into your project or look at the original Flink Scala API implementation go from there.

AlexLi-Monster commented 6 months ago

Also as a side question need your suggestion, since our Flink is 1.13 and not able to use this library now: Wondering what is the best practise to define serialiser for scala.Option or other collection type like List or Map in Flink 1.13. I have a POJO in Scala2:

class Building(
  var buildTime: Option[Instant],
  var managers: List[String]
)

when I disable Kryo for generic type serialisation, I will get the error for the POJO above Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type I guess need to define own custom serialiser for these class, but failed to find good example for it, is the code org.apache.flinkx.api.serializer.OptionSerializer from this library I can refer to?

Yeah, Kryo comes into play when Flink sees a type which it cannot find a TypeInformation instance for. So you need to bring missing serializer yourself. There is no best practices to implement own serializers, just take Flink API and implement abstract methods. This library is a copy of original Flink Scala API. You can copy paste org.apache.flinkx.api.serializer.OptionSerializer into your project or look at the original Flink Scala API implementation go from there.

Thanks for your good hint. I will give it a go.

Cheers.

AlexLi-Monster commented 6 months ago

I close this issue as fix for Scala2 is in: https://github.com/flink-extended/flink-scala-api/pull/98