spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.18k stars 1.56k forks source link

Improving Observability in Asynchronous Processing (`CompletableFuture`, `Mono`) #3528

Open chickenchickenlove opened 1 week ago

chickenchickenlove commented 1 week ago

Expected Behavior

When the consumer endpoint method returns CompletableFuture<?> or Mono<?> as a result, and both the CompletableFuture<?> and the Mono<?> are fail to complete due to an error thrown in the async loop, spring-kafka should report metrics indicating that there have been failures in async consumer tasks.

For example,

spring_kafka_listener_seconds_count{
error="AsyncLoopExecutionFailedException",
messaging_kafka_consumer_group="topic4",
messaging_operation="receive",
messaging_source_kind="topic",
messaging_source_name="topic4",
messaging_system="kafka",
spring_kafka_listener_id="org.springframework.kafka.KafkaListenerEndpointContainer#0-0"} 6

Current Behavior

When the consumer endpoint method returns CompletableFuture<?> or Mono<?> as result, the MessagingMessageListenerAdapater adds a callback to both the CompletableFuture<?> and the Mono<?>.

However, currently, KafkaMessageListenerContainer does not consider whether CompletableFuture and Mono fail to complete.

If both CompletableFuture and Mono instances are success to be created, KafkaMessageListenerContainer will not report error metric even if both CompletableFuture and Mono fail to complete.

Context

I discovered this issue while solving another problem(GH-3276), and I checked to see if it was actually the case. As expected, there was an issue.

To reproduce it, you can use method below. it describes that

  1. The CompletableFuture<?> successes to be created.
  2. The CompletableFuture<?> fails to complete.

    @Slf4j
    @Component
    public class MyAsyncBean {
    
    @RetryableTopic(
            attempts = "5",
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE,
            autoCreateTopics = "true",
            backoff = @Backoff(delay = 1000)
    )
    @KafkaListener(topics = "topic4", groupId = "topic4")
    public CompletableFuture<Void> receive(ConsumerRecord<Object, Object> record) {
        return CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
    
                    throw new RuntimeException("Should report metric.");
                });
    }
    }

To reviewer

If you consider this to be a reasonable request, may I take on this task and try to solve it?

artembilan commented 6 days ago

I think this makes sense, but does not look that easy to fix. We have a code like this in the KafkaMessageListenerContainer:

return observation.observe(() -> {
                try {
                    invokeOnMessage(cRecord);
                    successTimer(sample, cRecord);
                    recordInterceptAfter(cRecord, null);
                }
                catch (RuntimeException e) {
                    failureTimer(sample, cRecord);
                    recordInterceptAfter(cRecord, e);
                    if (this.commonErrorHandler == null) {
                        throw e;
                    }
                    observation.error(e);

So, as long as listener returns control over here, an observation is closed. It sounds like we must not use an observe(), but rather start() manually and deal with it in the mentioned MessagingMessageListenerAdapter. Stop it there (or error()) respectively in the async branch. But need to dig more for technical details. Feel free to give it a try!

chickenchickenlove commented 4 days ago

@artembilan thanks for considering this as a one of reasonable request! As you mentioned, Observation instance will close its observe scope when each record is completed.

It means that Observation instance in KafkaMessageListenerContainer does not consider the result of CompletableFuture<?> and Mono<?>.

Therefore, i believe a new Observation instance is needed here, similar to the one in KafkaMessageListenerContainer.

// MessagingMessageListenerAdapater.java
if (result instanceof CompletableFuture<?> completable) {
         ...
    completable.whenComplete((r, t) -> {
                 // newObservation maybe?
                 observation.observer(() -> { 
                     if (t == null) {
            asyncSuccess(r, replyTopic, source, messageReturnType);
            acknowledge(acknowledgment);
            }
            else {
            asyncFailure(request, acknowledgment, consumer, t, source);
            }
                 }
                 ...

This way can't measure processed time metrics accurately. if accurate async processed time is needed, developers should create observation instance and include it their endpoint method.

To sum up, i think we have two options.

  1. Add a new observation instance to MessagingMessageListenerAdapter.handleResult() and open new observation scope. (pros: readability, cons : it may report inaccurate processing time)
  2. Declare a new Observation class for measuring async objects, and guide developers to include it in their endpoint methods. (pros : report accurate processing time, cons: readability)

    @KafkaListener(...)
    static class TopicListener {
    
        @KafkaHandler
        public Mono<Void> listenAgain(String message) {
                         Observation ob = new AsyncAwareObservation();
            return Mono.fromCallable(() -> {
                                 ob.observer(() -> { 
                                 try {
                                      // Developer's logic.
                                      ...
                }
                catch (Exception e) {
                    ob.error(e);
                }
                                 });
                ...
            });
        }
    }

What do you think? When you have time, please let me know 🙇‍♂️

artembilan commented 4 days ago

Well, async it or not, it is still a unit of work. For observation it does not matter if it was started in one thread and finished in the other. So, propagating it from one thread to another when that Mono/CompletableFuture is completed is the way to go. See more info in Micrometer docs: https://micrometer.io/docs/observation#instrumentation_of_thread_switching_components.

I don't think though that we need to instrument an async executor, but having a variable for the registry.getCurrentObservation() and propagate it down to the whenComplete() should be enough. The KafkaMessageListenerContainer then must just start an observation and that MessagingMessageListenerAdapter should get it in its handleResult() to stop for sync branch and propagate for stop or error in the whenComplete().

sobychacko commented 4 days ago

@chickenchickenlove We will take a crack at this on our end and share the feedback.

chickenchickenlove commented 3 days ago

Thanks for giving me deep information! 🙇‍♂️ I will study it even if I can't complete the task.

Also, thanks for keeping me updated on what's going on!