confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
123 stars 1.04k forks source link

Consider making retries in CommandRunner configurable #7797

Open guozhangwang opened 3 years ago

guozhangwang commented 3 years ago

Today we always pass in Integer.MAX_VALUE all the way down to CommandRunner, which would be used as executeStatement:

RetryUtil.retryWithBackoff(
                maxRetries,
                STATEMENT_RETRY_MS,
                MAX_STATEMENT_RETRY_MS,
                () -> statementExecutor.handleRestore(command),
                WakeupException.class
            );

The assumption behind that is if a cmd is successfully written to the cmd topic, then it should be 100% valid and safe, and the execution should not fail. However, there are still scenarios where the execution could still fail, consistently (see an example stacktrace below), and hence retrying indefinitely is actually not the preferred methodology here.

[2021-05-17 16:40:42,105] ERROR Exception encountered running command: Failed to construct kafka consumer. Retrying in 100 ms (io.confluent.ksql.util.RetryUtil:106)
[2021-05-17 16:40:42,105] ERROR Stack trace: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
    at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
    at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:304)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:772)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:583)
    at io.confluent.ksql.query.KafkaStreamsBuilderImpl.buildKafkaStreams(KafkaStreamsBuilderImpl.java:42)
    at io.confluent.ksql.query.QueryExecutor.buildQuery(QueryExecutor.java:219)
    at io.confluent.ksql.engine.EngineExecutor.executePersistentQuery(EngineExecutor.java:358)
    at io.confluent.ksql.engine.EngineExecutor.lambda$execute$1(EngineExecutor.java:109)
    at java.base/java.util.Optional.map(Optional.java:265)
    at io.confluent.ksql.engine.EngineExecutor.execute(EngineExecutor.java:109)
    at io.confluent.ksql.engine.KsqlEngine.execute(KsqlEngine.java:193)
    at io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor.executePlan(InteractiveStatementExecutor.java:241)
    at io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor.handleStatementWithTerminatedQueries(InteractiveStatementExecutor.java:194)
    at io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor.handleStatement(InteractiveStatementExecutor.java:119)
    at io.confluent.ksql.rest.server.computation.CommandRunner.lambda$executeStatement$3(CommandRunner.java:237)
    at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:89)
    at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:60)
    at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:41)
    at io.confluent.ksql.rest.server.computation.CommandRunner.executeStatement(CommandRunner.java:243)
    at io.confluent.ksql.rest.server.computation.CommandRunner.fetchAndRunCommands(CommandRunner.java:226)
    at io.confluent.ksql.rest.server.computation.CommandRunner$Runner.run(CommandRunner.java:295)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.KafkaException: Class io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor cannot be found
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:443)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:424)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:411)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:708)
    ... 25 more
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:374)
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:363)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:372)
    ... 29 more
 (io.confluent.ksql.util.RetryUtil:110)

If we cannot guarantee that the above assumption is always true, then we should consider making the retries to be configurable and when retries are exhausted, we consider it as a fatal error and handle it (e.g. delete the corresponding cmd from the topic, and report it to client).

guozhangwang commented 3 years ago

Context from slack: https://confluent.slack.com/archives/C9Z794XSL/p1626364202430400

Related issue: https://github.com/confluentinc/ksql/issues/7622

rodesai commented 3 years ago

copying over my thoughts from slack:

We shouldn't make max-retries configurable - if you skip a command you'll wind up with a different set of streams and tables than you should have. Instead we should make the replay more robust by either:

Of these I think the first option is simpler to do.

Also in either case we should have some way to have the user explicitly ask for a given command to be skipped - but this should be something they configure explicitly for that command and they should understand that they may wind up with a totally different set of streams/tables if they do this. This could either be a list of offsets to skip, or we just give them a tool to truncate the command topic.