vert-x3 / vertx-amqp-client

An AMQP client for Vert.x
Apache License 2.0
17 stars 18 forks source link

handleMessage method polls buffered message without using it #36

Closed ferhatsahinkaya closed 4 years ago

ferhatsahinkaya commented 4 years ago
  private void handleMessage(AmqpMessageImpl message) {
    boolean schedule = false;
    boolean dispatchNow = false;

    synchronized (this) {
      if (handler != null && demand > 0L && buffered.isEmpty()) {
        if (demand != Long.MAX_VALUE) {
          demand--;
        }
        dispatchNow = true;
      } else if (handler != null && demand > 0L) {
        // Buffered messages present, deliver the oldest of those instead
        buffered.add(message);
        message = buffered.poll();
        if (demand != Long.MAX_VALUE) {
          demand--;
        }

        // Schedule a delivery for the next buffered message
        schedule = true;
      } else {
        // Buffer message until we aren't paused
        buffered.add(message);
      }
    }

    // schedule next delivery if appropriate, after earlier delivery to allow chance to pause etc.
    if (schedule) {
      scheduleBufferedMessageDelivery();
    } else if (dispatchNow) {
      deliverMessageToHandler(message);
    }
  }

Version: 3.9.1

In AmqpReceiverImpl.handleMessage implementation above, when following conditions hold, the oldest message in the buffer is removed without using it.

Looks like issue can be fixed with either of the following

Probably the preferred way would be dispatching the message immediately as there is no need to wait for dispatching the message later.

cescoffier commented 4 years ago

Do you have a reproducer showing the issue?

And yes, I believe that in this case, we need to deliver the message before setting schedule to true.

ferhatsahinkaya commented 4 years ago

Please find the project reproducing the issue here.

I can locally fix the issue by dispatching the message immediately in the following case:

vietj commented 4 years ago

thanks, can you make a PR with a test and a fix that we will review?