vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
84 stars 82 forks source link

Move blocking send operation to executor thread #251

Open Arooba-git opened 1 year ago

Arooba-git commented 1 year ago

Hi! :)

Thank you for this project..

This PR fixes a blocking call in the send method of KafkaWriteStreamImpl class, which was detected with the help of BlockHound:

vkc1-blocking

We re-ran the test cases and also evaluated performance (in terms of sleep time latency of the SendThread) before and after the fix:

Before

vks1-latency-after

After

vks1-latency-before
Ladicek commented 1 year ago

The producer.send() operation is already called on a worker thread (ctx.executeBlocking()), so this helps nothing AFAICT.

Arooba-git commented 1 year ago

@Ladicek Right, you know I was wondering the same.. :) Then I came across similar issue by another user (of executeBlocking still throwing blocking errors: https://github.com/eclipse-vertx/vert.x/issues/2798

The Vertx developers then clarified its use in their documentation:

WARNING: Blocking code should block for a reasonable amount of time (i.e no more than a few seconds). Long blocking operations or polling operations (i.e a thread that spin in a loop polling events in a blocking fashion) are precluded. When the blocking operation lasts more than the 10 seconds, a message will be printed on the console by the blocked thread checker. Long blocking operations should use a dedicated thread managed by the application...

https://vertx.io/docs/apidocs/io/vertx/rxjava/core/Context.html

vietj commented 1 year ago

I believe the thing that matters here is : is the blocking operation for such time expected or not ?

pierDipi commented 1 year ago

@vietj yes, send blocks on metadata updates for max.block.ms:

This is particularly evident when the Kafka cluster is down or unavailable

vietj commented 1 year ago

so I think we should use this specific executor, however it should not be nested inside executeBlocking and use directly instead

Arooba-git commented 1 year ago

@vietj Should I try updating?

vietj commented 1 year ago

yes you should

On Wed, Jul 26, 2023 at 5:49 PM Arooba Shahoor @.***> wrote:

@vietj https://github.com/vietj Should I try updating?

— Reply to this email directly, view it on GitHub https://github.com/vert-x3/vertx-kafka-client/pull/251#issuecomment-1652085992, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABXDCQTCHX7IG4AXAPKOPDXSE4AXANCNFSM6AAAAAA2XZAVPI . You are receiving this because you were mentioned.Message ID: @.***>

Arooba-git commented 6 months ago

Apologies for the delayed response..

If I replace executeBlocking() with executor.execute(..), the return type will become void right? so we would have to update the usage of send method call everywhere.. no? 🤔

or should we return promise like this:

  @Override
  public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    ContextInternal ctx = vertx.getOrCreateContext();
    ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record);
    int len = this.len(record.value());
    this.pending += len;

      Promise<RecordMetadata> prom = ctx.promise();
      try {
        executor.execute(() -> {
          this.producer.send(record, (metadata, err) -> {
            // callback from Kafka IO thread
            ctx.runOnContext(v1 -> {
              synchronized (KafkaWriteStreamImpl.this) {

                // if exception happens, no record written
                if (err != null) {

                  if (this.exceptionHandler != null) {
                    Handler<Throwable> exceptionHandler = this.exceptionHandler;
                    ctx.runOnContext(v2 -> exceptionHandler.handle(err));
                  }
                }

                long lowWaterMark = this.maxSize / 2;
                this.pending -= len;
                if (this.pending < lowWaterMark && this.drainHandler != null) {
                  Handler<Void> drainHandler = this.drainHandler;
                  this.drainHandler = null;
                  ctx.runOnContext(drainHandler);
                }
              }
            });

            if (err != null) {
              if (startedSpan != null) {
                startedSpan.fail(ctx, err);
              }
              prom.fail(err);
            } else {
              if (startedSpan != null) {
                startedSpan.finish(ctx);
              }
              prom.complete(metadata);
            }
          });
        });
      } catch (Throwable e) {
        synchronized (KafkaWriteStreamImpl.this) {
          if (this.exceptionHandler != null) {
            Handler<Throwable> exceptionHandler = this.exceptionHandler;
            ctx.runOnContext(v3 -> exceptionHandler.handle(e));
          }
        }
        if (startedSpan != null) {
          startedSpan.fail(ctx, e);
        }
        prom.fail(e);
      }

    return prom.future();
  }
vietj commented 5 months ago

it is not clear what is happening since we are already in an execute blocking block, what is the actual issue we are trying to fix ? does it mean we should avoid execute blocking with kafka ?