confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
64 stars 123 forks source link

Health-checks #71

Open JorgenRingen opened 3 years ago

JorgenRingen commented 3 years ago

Kafka-streams exposes a method for retrieving the current state: org.apache.kafka.streams.KafkaStreams#state. We use this for implementing health-checks. If state is in created, rebalancing or running the application is considered healthy. Otherwise the app is considered unhealthy and the application would normally be stopped/restarted.

In PC I've implemented a health-check querying io.confluent.parallelconsumer.ParallelEoSStreamProcessor#isClosedOrFailed == false.

Would a different, more "healthcheck-like" abstraction be smart? Perhaps on the io.confluent.parallelconsumer.ParallelConsumer interface? The current method requires a cast to ParallelEoSStreamProcessor.

(not critical!)

astubbs commented 3 years ago

Do you have any thoughts on what metrics you might consider would indicate health? One major aspect is "progress" through the work queues. Described here: https://github.com/confluentinc/parallel-consumer/issues/34 Time since last progress made perhaps. Or time since last committable offset shift - per partition?

JorgenRingen commented 3 years ago

In kafka-streams any leaked exception from user-code or any "permanent internal errors" causes the state to change to Error -> Pending Shutdown -> Not Running. We use this for our (aggregate) application health-checks and polls KS.state. If KS is in a unhealthy state the app is typically restarted by container-platform. Sometimes a restart can make the app recover and other times there's a poison-message being consumed that causes a restart-loop and needs manual intervention.

So maybe exposing something like state on ParallelConsumer interface for application-healthchecks might be a good start and any "shutdown"-scenarios in #34 would change the state to "error".

I'm doing ParallelEosStreamProcessor#isClosedOrFailed now and it works ok, but maybe a more uniform interface wold be smart down the road.

Current spring-boot-actuator-pc-healthcheck :)

@Component
class ParallelConsumerHealthIndicator(
    parallelConsumer: ParallelStreamProcessor<String, Any>,
) : HealthIndicator {

    private val logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass())

    val parallelEosStreamProcessor = parallelConsumer as ParallelEoSStreamProcessor<String, Any> // notice cast

    override fun health(): Health {
        return if (!parallelEosStreamProcessor.isClosedOrFailed) {
            Health.up().build()
        } else {
            if (parallelEosStreamProcessor.failureCause != null) {
                logger.warn(
                    "Parallel-consumer is unhealthy! Failure-cause: ${parallelEosStreamProcessor.failureCause.message}",
                    parallelEosStreamProcessor.failureCause
                )
            } else {
                logger.warn("Parallel-consumer is closed or unhealthy - no failure-cause.")
            }

            Health.down().build()
        }
    }
}
astubbs commented 1 year ago

Draft implementation with test: https://github.com/confluentinc/parallel-consumer/pull/464

If you can, take a look and see if there's anything else you'd like to see. I won't get them all first time around, but that's the goal.