streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
278 stars 119 forks source link

Flink 1.11.3 is not able to restore with savepoint taken with Flink 1.9.3 #256

Closed MSShravan closed 3 years ago

MSShravan commented 3 years ago

Steps to reproduce this issue,

  1. Deploy Flink 1.9.3 with pulsar-flink connector 2.12-2.4.28.1 and run a simple pipeline with Pulsar source, map operation and sink nodes.
  2. Take a savepoint with the above flink job
  3. Upgrade Flink to 1.11.3 with pulsar-flink connector 2.12-2.4.28.1 to 2.12-1.11-2.5.6.1 and let it restore from the savepoint taken in step 2. Below error is observed with Flink job going to restarting state.

2021-02-22 15:51:29 org.apache.flink.util.StateMigrationException: The new state typeSerializer for operator state must not be incompatible. at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:298) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.initializeState(FlinkPulsarSource.java:637) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)

According to Flink documentation, savepoint restore between 1.9.3 and 1.11.3 are supported.

jianyun8023 commented 3 years ago

I wrote a sample for migrating it, but it didn't meet my expectations, which may take some time to find a new solution.

jianyun8023 commented 3 years ago

Can you provide a checkpoint for the data? I can’t recover the problem according to your environment now. 1.9.3 upgrade to 1.11.0 or 1.11.3

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.io.EOFException: No more bytes left.
    at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
    at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
    at com.esotericsoftware.kryo.io.Input.readInt(Input.java:350)
    at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more
MSShravan commented 3 years ago

Hi @Jianyun Zhao, I tried with the jar that you provided, it doesn't seem to work and I'm getting below error,

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)     at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)     at java.lang.Thread.run(Thread.java:748) Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource; local class incompatible: stream classdesc serialVersionUID = 6092119106917196776, local class serialVersionUID = 1180113614479873613     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)     ... 6 more

I have attached the savepoint file taken with Flink 1.9.3. savepoint-8bf26e-b0b88ab9b0bb.zip

jianyun8023 commented 3 years ago

Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource; local class incompatible: stream classdesc serialVersionUID = 6092119106917196776, local class serialVersionUID = 1180113614479873613 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)

This error should be caused by the existence of different versions of pulsar-flink connector in your flink task, please check it.

MSShravan commented 3 years ago

Jars were not properly downloaded and now I have got it resolved. And I'm seeing the below error now

2021-03-10 17:15:18.921 [Source: coda_cpu_src -> (coda_cpu_src_monitoring, invalid_data_topic_monitoring, Sink: invalid_data_topic, String_to_JsonNode -> pulsar_TimestampTransform.Transform -> (Transform_transformation, JsonNode_to_String -> (coda_cpu_sink_monitoring, Sink: coda_cpu_sink))) (6/8)] WARN org.apache.flink.runtime.taskmanager.Task - Source: coda_cpu_src -> (coda_cpu_src_monitoring, invalid_data_topic_monitoring, Sink: invalid_data_topic, String_to_JsonNode -> pulsar_TimestampTransform.Transform -> (Transform_transformation, JsonNode_to_String -> (coda_cpu_sink_monitoring, Sink: coda_cpu_sink))) (6/8) (0ea31177c1cb9ad92296054ddd0bf6a7) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource6ae2e79afadd77f926d57cdd7bfa1e1b(6/8) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 common frames omitted Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 common frames omitted Caused by: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)

jianyun8023 commented 3 years ago

Caused by: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)

This problem occurs when flink reads the savepoint and the deserialization of MessageId fails, and the BatchMessageIdImpl field is added after upgrading the Pulsar client from version 2.5 to 2.6.

Pulsar PIP-54 Support acknowledgment at batch index level (#6052)

public class BatchMessageIdImpl extends MessageIdImpl {
    private final static int NO_BATCH = -1;
    private final int batchIndex;
+    private final int batchSize;

    private final transient BatchMessageAcker acker;
}

To cope with this problem, creating a MessageId serializer is a good solution.

For how to migrate the state smoothly, I think push a new 1.9 connector and upgrade the checkpoint data. Finally, just use the flink 1.11 connector again.

@MSShravan What do you think?

jianyun8023 commented 3 years ago

I have separated three issues for the bug that savepoint state does not migrate properly. ~#260 upgrade flink 1.9 source state~ I think the migration issue should be solved in the new version.

261 upgrade flink 1.11 source state

262 Optimize the data structure of state in Source

jianyun8023 commented 3 years ago

Now there is a viable migration solution that I verified locally and passed. You can verify if it works.

Need to compile the code on fix-state-for-flink-1.11

git clone https://github.com/streamnative/pulsar-flink.git -b fix-state-for-flink-1.11
cd pulsar-flink
mvn install -DskipTests **-Pscala-2.12**

Then you can find two jar packages at pulsar-flink-connector/target. pulsar-flink-connector-2.12-1.11-${version}-SNAPSHOT.jar: This package contains the required dependencies, including pulsar-client-all 2.7.0 original-pulsar-flink-connector-2.12-1.11-${version}-SNAPSHOT.jar: This package contains only the connector code and does not contain the pulsar-client code.

The migration is divided into two parts.

Raven888888 commented 2 years ago

Hi @jianyun8023, I am facing the same issue, not sure if it is exactly the same root cause (happy to open new ticket if it is a different root cause).

Versions Flink: 1.12.1 Pulsar: 2.7.0 pulsar-client: 2.5.2 pulsar-flink-connector-2.11-1.12: 2.7.0

With this setup, I have created flink checkpoints, no problems.

However, due to bintray shutdown earlier this year, I have to upgrade pulsar-flink-connector-2.11-1.12 to 2.7.6 because that is the version available on maven repo.

Flink job is unable to restore from checkpoint. Error logs:

2021-10-29 04:01:21,783 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job ALC-Kafka (85b31724ce21c421899952c202ac4601) switched from state RESTARTING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=10000)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) ~[?:?]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_292]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.util.StateMigrationException: The new state typeSerializer for operator state must not be incompatible.
        at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:300) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.tryMigrateState(FlinkPulsarSource.java:789) ~[?:?]
        at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.initializeState(FlinkPulsarSource.java:736) ~[?:?]
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

Any pointer is appreciated!