tchiotludo / akhq

Kafka GUI for Apache Kafka to manage topics, topics data, consumers group, schema registry, connect and more...
https://akhq.io/
Apache License 2.0
3.4k stars 655 forks source link

Healthcheck does not fail when threads are killed by exceptions #1375

Open rouke-broersma opened 1 year ago

rouke-broersma commented 1 year ago

For example:

java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.akhq.controllers.GroupController.lambda$filterByTopics$0(GroupController.java:103)
    at java.base/java.util.Optional.map(Unknown Source)
    at org.akhq.controllers.GroupController.filterByTopics(GroupController.java:98)
    at org.akhq.controllers.$GroupController$Definition$Exec.dispatch(Unknown Source)
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378)
    at io.micronaut.context.DefaultBeanContext$4.invoke(DefaultBeanContext.java:594)
    at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:303)
    at io.micronaut.web.router.RouteMatch.execute(RouteMatch.java:111)
    at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:103)
    at io.micronaut.http.server.RouteExecutor.lambda$executeRoute$14(RouteExecutor.java:659)
    at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:49)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
    at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at io.micrometer.core.instrument.composite.CompositeTimer.recordCallable(CompositeTimer.java:77)
    at io.micrometer.core.instrument.Timer.lambda$wrap$1(Timer.java:162)
    at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedCallable.call(InvocationInstrumenterWrappedCallable.java:53)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at org.akhq.utils.Logger.call(Logger.java:44)
    at org.akhq.modules.AbstractKafkaWrapper.listConsumerGroups(AbstractKafkaWrapper.java:169)
    at org.akhq.modules.$KafkaWrapperRequestScope$Definition$Exec.dispatch(Unknown Source)
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378)
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128)
    at org.akhq.modules.$KafkaWrapperRequestScope$Definition$Intercepted.listConsumerGroups(Unknown Source)
    at org.akhq.repositories.ConsumerGroupRepository.all(ConsumerGroupRepository.java:48)
    at org.akhq.repositories.ConsumerGroupRepository.findByTopics(ConsumerGroupRepository.java:125)
    at org.akhq.controllers.GroupController.lambda$filterByTopics$0(GroupController.java:101)
    ... 22 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.clients.admin.KafkaAdminClient$24.handleFailure(KafkaAdminClient.java:3395)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.handleTimeoutFailure(KafkaAdminClient.java:853)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.access$2100(KafkaAdminClient.java:745)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.enqueue(KafkaAdminClient.java:1459)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.call(KafkaAdminClient.java:1478)
    at org.apache.kafka.clients.admin.KafkaAdminClient.listConsumerGroups(KafkaAdminClient.java:3322)
    at org.apache.kafka.clients.admin.Admin.listConsumerGroups(Admin.java:914)
    at org.akhq.modules.AbstractKafkaWrapper.listConsumerGroups(AbstractKafkaWrapper.java:170)
    ... 29 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

The UI then shows this message: image

However the pods report as being perfectly happy: image

A restart of the pod fixes the issue. Kafka also isn't unavailable, it's only akhq having trouble reconnecting (probably because the background thread that's supposed to do this has crashed and is not restarting).

Functioning health checks would have us back online, now we have to manually restart the pods. We are using your helm chart, are we missing some configuration option to make the health check behave in the expected manner?

tchiotludo commented 1 year ago

The best will be to implement either :

rouke-broersma commented 1 year ago

@tchiotludo I'm not much of a Java developer, but shouldn't this exception simply crash the application since there's currently no way akhq recovers from this? Or the exception should be caught, and the process should be auto-restarted. The current way leaves akhq functionally broken but still running. There's no button to get it back to a working state either.