flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
55 stars 14 forks source link

Not being able to restore stateful operator #96

Closed buinauskas closed 6 months ago

buinauskas commented 6 months ago

I've encountered an issue when restoring a job from a savepoint, this happens when I need to make some changes to the already existing job, the procedure is as follows:

  1. Stop the job and take a canonical savepoint
  2. Redeploy the job by providing the savepoint path
  3. Get the following exception

We're running Flink cluster in Kubernetes in session mode.

2024-02-29 09:20:39
java.lang.RuntimeException: Error while getting state
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
    at com.vinted.search.readmodel.translations.TranslationBuilder.open(TranslationBuilder.scala:18)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
    at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer (org.apache.flinkx.api.serializer.ScalaCaseClassSerializer@ad4d1cde) must not be incompatible with the old state serializer (org.apache.flinkx.api.serializer.ScalaCaseClassSerializer@60d01b87).
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:247)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createOrUpdateInternalState(HeapKeyedStateBackend.java:326)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createOrUpdateInternalState(HeapKeyedStateBackend.java:313)
    at org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
    ... 15 more

The classes I'm using as a state:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class TranslationKey(
  id: Long,
  entry_type: Long,
  entry_id: Long
)

object TranslationKey {
  implicit val typeInformation: TypeInformation[TranslationKey] = deriveTypeInformation
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class TranslationValue(
  id: Long,
  translation_key_id: Long,
  locale: String,
  value: String
)

object TranslationValue {
  implicit val typeInformation: TypeInformation[TranslationValue] = deriveTypeInformation
}

And the operator declares uses the state as follows, I've omitted implementation details because they don't matter:

import org.apache.flink.api.common.state._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flinkx.api._
import org.apache.flinkx.api.serializers._

class TranslationBuilder extends RichCoFlatMapFunction[TranslationKey, TranslationValue, Translation] {
  private var translationKeyState: ValueState[TranslationKey] = _
  private var translationValuesState: MapState[Long, TranslationValue] = _

  override def open(parameters: Configuration): Unit = {
    translationKeyState = getRuntimeContext.getState(new ValueStateDescriptor("keys", TranslationKey.typeInformation))
    translationValuesState = getRuntimeContext.getMapState(new MapStateDescriptor("values", longInfo, TranslationValue.typeInformation))
  }

  override def flatMap1(value: TranslationKey, out: Collector[Translation]): Unit = ???

  override def flatMap2(value: TranslationValue, out: Collector[Translation]): Unit = ???
}

Is there something I'm doing wrong? This works with deprecated scala API. The exception also says that the issue might be with heap state, which I'm using in production. If switching to RoksDB would solve the issue, I'd be glad to make the switch.

AlexLi-Monster commented 6 months ago

Hi,

Just checking: for you step2, did you evolve your case class of TranslationKey or TranslationKey then do the redeploy?

buinauskas commented 6 months ago

Hi @AlexLi-Monster, no, it's the same codebase recompiled and redeployed. I'm in the process of migrating from the deprecated Scala API and am testing various scenarios.

novakov-alexey commented 6 months ago

@buinauskas When you say "I need to make some changes to the already existing job", does it mean you switch Flink TypeInformations from Apache Flink Scala API to this lib Scala API?

The official Flink Scala API (it is deprecated already) stores own Serialisers in a savepoint/checkpoint which can NOT be read by this library, because each library uses own code to de/serialize, that is why you can see different hash codes in the error message.

Could you check if this article explains the problem and gives some solution to you? https://ververica.zendesk.com/hc/en-us/articles/10627965111068-How-to-migrate-a-savepoint-created-using-Apache-Flink-Scala-API-to-another-format

buinauskas commented 6 months ago

Hey, the change is not from one API to another.

The job is bootstrapped using this lib Scala API and there are no changes to the type information.

This exact exception happened by literally recompiling the same project and redeploying it to the same cluster from the snapshot.

I suspect this could be the issue with the fact that we run in session mode and class loading.

I'll go through your suggested article to see if I'm missing something, thank you.

novakov-alexey commented 6 months ago

Hm.... this could be indeed related to some different classes which are loaded in your Flink job/cluster runtime. There is definitely something changed in the environment that led to slightly different ScalaCaseClassSerializer object.

It is interesting case to solve and share with community. Let us know if you find the root cause.

novakov-alexey commented 6 months ago

@buinauskas you might want to review these documentation on class loading in Flink:

I would double check whether a session cluster is pre-built with different Scala dependencies than your job jar (uber jar?) or not.

buinauskas commented 6 months ago

I've came to this yesterday too but held on updating this issue, my session cluster is pre-built with:

FLINK_SCALA_API_VERSION=1.18.1_1.1.3
MAGNOLIA_VERSION=1.1.8

#Flink scala api
curl https://repo1.maven.org/maven2/org/flinkextended/flink-scala-api_$SCALA_MAJOR_VERSION/$FLINK_SCALA_API_VERSION/flink-scala-api_${SCALA_MAJOR_VERSION}-${FLINK_SCALA_API_VERSION}.jar --fail --output /opt/flink/lib/flink-scala-api_$SCALA_MAJOR_VERSION-${FLINK_SCALA_API_VERSION}.jar
curl https://repo1.maven.org/maven2/com/softwaremill/magnolia1_2/magnolia_$SCALA_MAJOR_VERSION/$MAGNOLIA_VERSION/magnolia_$SCALA_MAJOR_VERSION-$MAGNOLIA_VERSION.jar --fail --output /opt/flink/lib/magnolia_$SCALA_MAJOR_VERSION-$MAGNOLIA_VERSION.jar

Packaging only flink extended library has helped with the issue.

implementation "org.flinkextended:flink-scala-api_$scalaMajorVersion:$flinkScalaApiVersion"
    flinkShadowJar("org.flinkextended:flink-scala-api_$scalaMajorVersion:$flinkScalaApiVersion") {
        exclude group: 'org.apache.flink'
        exclude group: 'org.scalameta'
        exclude group: 'org.scala-lang'
    }

But now there's a similar issue when I started the job classes are not sent over the wire correctly. I suspect similar problems and will investigate further.

novakov-alexey commented 6 months ago

Fascinating. This issue can be related to the Flink serialization approach itself. I guess because of two different class loaders (system classloader and user classloader), which are used in this situation, recovery procedure results into a different serializer object, so that Fllink cannot read a savepoint, because it thinks that the provided serializer is unkown.

buinauskas commented 6 months ago

Indeed that's what I also suspect, except I was calling this anything but fascinating.

I'll close the issue because it is not related to this library, thank you for pointing me into a right direction 🙏