vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
84 stars 82 forks source link

Closing a consumer might never produce a completed future when `Consumer.close()` throws an exception #221

Closed pierDipi closed 2 years ago

pierDipi commented 2 years ago

Version

Which version(s) did you encounter this bug ? 4.2.5

Context

When closing the underlying Kafka consumer fails for some reason, the returned future is never complete.

Do you have a reproducer?

This test will produce timeout errors since the Future<Void> as defined like this f = KafkaReadStream.close() will never be complete:

package io.vertx.kafka.client.tests;

import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import org.apache.kafka.common.KafkaException;

import java.time.Duration;
import java.util.Properties;

public class ConsumerFailedCloseTest extends ConsumerTestBase {

  @Override
  <K, V> KafkaReadStream<K, V> createConsumer(Vertx vertx, Properties config) {
    return new KafkaReadStreamImpl<>(
      vertx,
      new org.apache.kafka.clients.consumer.KafkaConsumer<K, V>(config) {
        @Override
        public void close(final Duration timeout) {
          super.close(timeout);
          throw new KafkaException("failed to close consumer");
        }
      },
      KafkaClientOptions.fromProperties(config, false));
  }
}

Steps to reproduce

  1. Add ConsumerFailedCloseTest to the test suite
  2. Run the test

Extra

Possible patch

diff --git a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java
index b88ee8c..77cc7b1 100644
--- a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java
+++ b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java
@@ -706,16 +706,31 @@ public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> {
   @Override
   public void close(Handler<AsyncResult<Void>> completionHandler) {
     if (this.closed.compareAndSet(false, true)) {
+      // Call wakeup before closing the consumer, so that existing tasks in the executor queue will
+      // wake up while we wait for processing the below added "close" task.
+      this.consumer.wakeup();
+
       this.worker.submit(() -> {
-        this.consumer.close();
+        // This flag captures whether the completionHandler has already been called due to an exception thrown by
+        // consumer.close() since we don't want to call it twice, while register the shutdown handler for
+        // the `worker` ExecutorService regardless of the previously thrown exception.
+        boolean closeFailed = false;
+        try {
+          this.consumer.close();
+        } catch (final Throwable ex) {
+          closeFailed = true;
+          if (completionHandler != null) {
+            completionHandler.handle(Future.failedFuture(ex));
+          }
+        }
+        final boolean closeFailedFinal = closeFailed;
         this.context.runOnContext(v -> {
           this.worker.shutdownNow();
-          if (completionHandler != null) {
+          if (completionHandler != null && !closeFailedFinal) {
             completionHandler.handle(Future.succeededFuture());
           }
         });
       });
-      this.consumer.wakeup();
     }
     else {
       if (completionHandler != null) {
pierDipi commented 2 years ago

@tsegismont @ppatierno what do you think about the proposed patch?

tsegismont commented 2 years ago

I believe it is a good idea to catch KafkaException, but not just any Throwable (including, e.g, OutOfMemoryError).

Then in the implementation there is room for improvement, I think.

You could do (I haven't checked if this compiles but you'll get the idea):

  @Override
  public Future<Void> close() {
    ContextInternal ctx = (ContextInternal) this.context;
    if (this.closed.compareAndSet(false, true)) {
      Promise<Void> promise = ctx.promise();

      // Call wakeup before closing the consumer, so that existing tasks in the executor queue will
      // wake up while we wait for processing the below added "close" task.
      this.consumer.wakeup();

      this.worker.submit(() -> {
        try {
          this.consumer.close();
          promise.complete();
        } catch (KafkaException ex) {
          promise.fail(ex);
        }
      });
      return promise.future().onComplete(v-> this.worker.shutdownNow());
    }
    return ctx.succeededFuture();
  }

  @Override
  public void close(Handler<AsyncResult<Void>> completionHandler) {
    Future<Void> future = close();
    if (completionHandler != null) {
      future.onComplete(completionHandler);
    }
  }
pierDipi commented 2 years ago

Thanks Thomas! I will send a PR based on your suggestion