apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.77k stars 4.21k forks source link

[Bug]: Failed to restore state on Flink runner #32000

Open rubensacoustic opened 1 month ago

rubensacoustic commented 1 month ago

What happened?

Sometimes I'm experimenting issues when Flink want to restore from a previous state. (using a savepoint)

Not sure if this is the right place but it seems like this is related to the schema codec.

This is the exception I'm getting:

org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer.
    at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1605)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1580)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1734)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox$24(StreamTask.java:1723)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.}
    ... 15 more
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$OnTimerInvoker$tsendOfBuffering$dHMtZW5kT2ZCdWZmZXJpbmc.invokeOnTimer(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:242)
    at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:206)
    at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:233)
    at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
    at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1165)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1134)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:1128)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:292)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1732)
    ... 14 more
Caused by: java.lang.RuntimeException: Error reading state.
    at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:652)
    at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:657)
    at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.onBufferingTimer(GroupIntoBatches.java:601)
Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected list element deserialization failure
    at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeNextElement(ListDelimitedSerializer.java:89)
    at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeList(ListDelimitedSerializer.java:51)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:629)
    at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:657)
    at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.onBufferingTimer(GroupIntoBatches.java:601)
    at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$OnTimerInvoker$tsendOfBuffering$dHMtZW5kT2ZCdWZmZXJpbmc.invokeOnTimer(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:242)
    at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:206)
    at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:233)
    at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
    at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1165)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1134)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:1128)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:292)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1732)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox$24(StreamTask.java:1723)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: varint overflow 1721980800000
    at org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:106)
    at org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55)
    at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
    at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
    at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
    at org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:431)
    at org.apache.beam.sdk.coders.Coder$ByteBuddy$IzJYxo36.decode(Unknown Source)
    at org.apache.beam.sdk.coders.Coder$ByteBuddy$IzJYxo36.decode(Unknown Source)
    at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:116)
    at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeNextElement(ListDelimitedSerializer.java:82)
    ... 32 more

This is not always happening, seems to be a random error or may be a race condition?

Apache Beam 2.57.0 Flink: tested with 1.15 and 1.18, same results

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

rubensacoustic commented 1 month ago

Other thing that I'm not sure if influence this is the fact that I'm using AutoValueSchema