vert-x3 / vertx-amqp-client

An AMQP client for Vert.x
Apache License 2.0
17 stars 18 forks source link

scheduleBufferedMessageDelivery decrements the demand without delivering a message #38

Closed ferhatsahinkaya closed 4 years ago

ferhatsahinkaya commented 4 years ago
private void scheduleBufferedMessageDelivery() {
    boolean schedule;

    synchronized (this) {
      schedule = !buffered.isEmpty() && demand > 0L;
    }

    if (schedule) {
      connection.runOnContext(v -> {
        AmqpMessageImpl message = null;

        synchronized (this) {
          if (demand > 0L) {
            if (demand != Long.MAX_VALUE) {
              demand--;
            }
            message = buffered.poll();
          }
        }

        if (message != null) {
          // Delivering outside the synchronized block
          deliverMessageToHandler(message);

          // Schedule a delivery for a further buffered message if any
          scheduleBufferedMessageDelivery();
        }
      });
    }
  }

Version: 3.9.1

scheduleBufferedMessageDelivery registers an async event to deliver messages to the handler, however, in a scenario where buffer is empty and there is a demand greater than 0, demand is still decremented even though there is no actual message delivered to the handler as message is set to null in this case.

At first glance, one might think that async event will not be scheduled if the buffer is empty, however there is no guarantee the condition to schedule message delivery will hold at the time of async event execution. There are multiple callers of scheduleBufferedMessageDelivery method so that multiple async events for message delivery can be scheduled at the same time. Fetch, handleMessage, handler and scheduleBufferedMessageDelivery itself are all calling scheduleBufferedMessageDelivery method.

In the scenario below, demand is decremented incorrectly and that stops message delivery from the AmqpReceiver buffer to the handlers.

This issue may cause stopping the delivery of the messsages to the handler forever.

When there is no actual message retrieved from the buffer, demand should not be decremented as demand is not met with a message delivery to the handler.

cescoffier commented 4 years ago

Isn't that one a duplicate of #36 ?

cescoffier commented 4 years ago

Ah no, not really, it's a bit different sorry.