Azure / azure-service-bus-java

☁️ Java client library for Azure Service Bus
https://azure.microsoft.com/services/service-bus
MIT License
60 stars 59 forks source link

IMessageReceiver.receiveBatch(int maxMessageCount, Duration serverWaitTime) Issue #391

Closed ganeshpft closed 4 years ago

ganeshpft commented 5 years ago

Actual Behavior

  1. receiveBatch API is receiving messages less than the given max count even though messages available in the bus and Active Message count reduced to the provided max count instead of received message count.

Expected Behavior

  1. receiveBatch API should receive messages from the service bus and Active Message count should reduced by the received message count.

Steps To Produce

  1. Post 1000 messages to Azure Service Bus Queue.
  2. Use receiveBatch(maxMessageCount,Duration.ofSeconds(30)) API to receive messages with ReceiveMode.RECEIVEANDDELETE and TransportType.AMQP_WEB_SOCKETS. Attached ASB Code Snippet.txt for reference.
  3. Execute above steps for multiple iteration. Attached Iteration Details

API References

  1. https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.servicebus.imessagereceiver.receivebatch?view=azure-java-stable#com_microsoft_azure_servicebus_IMessageReceiver_receiveBatch_int_Duration_

Versions

Attachments

Iteration Details ASB - ReceiveBatch log.txt ASB Code Snippet.txt

yvgopal commented 5 years ago

The API receiveBatch() says that it will receive upto the batch size of messages, not exact batch size of messages. We don't have any API that fetches exactly batch size of messages.

ganeshpft commented 5 years ago

The actual issue here is loss of messages. Active Message count reduced by given batch size instead of received message count.

yvgopal commented 5 years ago

RECEIVEANDDELETE mode might lose messages, but only in cases of network errors. It shouldn't lose almost 100 messages as in your case. Can you reproduce the same thing using regular AMQP transport? non websockets? Is the entity partitioned? How did you send messages websockets or something else? Are any messages larger than 4 KB? For partitioned entities, receiving messages larger than 4 KB wouldn't have worked. A fix is being rolled out to service this week.

ganeshpft commented 5 years ago

Partitioning is false. Messages posted using TransportType.AMQP_WEB_SOCKETS and all posted messages are less than 4KB only. (4KB issue we already aware.)

Will execute the same test case with TransportType.AMQP then publish the results once again.

ganeshpft commented 5 years ago

Same issue facing with the TransportType.AMQP also. PFA for Iteration results Transport AMQP Iteration Results

Lib used - >azure-service-2.0.0.jar

POM

<dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-servicebus</artifactId>
            <version>2.0.0</version>
</dependency>
yvgopal commented 4 years ago

Now I see what is causing your issue. After every receive batch call, you are closing the receiver. That's why those messages are lost. They are in the closed receiver. Here is what is happening..

  1. You open a receive link
  2. You send a request for 5 (your batch size) messages.
  3. Messages will arrive into the receiver one after the other. All 5 don't come in one batch. There is no batching per se. Messsages come in one after the other.
  4. The receive request returns as quickly as possible without waiting for all messages mentioned in the receiveBatch call. You can argue why it is returning quickly, instead of waiting to receive the whole batch. What if there are only two messages in the queue when you called receiveBatch(5)? Should it wait until time out and then return only two messages? or Return as soon as there is at least one message? We chose the second option.
  5. After your receiveBatch calls returns two messages, other 3 messages arrive into the receiver from the service.
  6. If you call another receiveBatch(5) now, you will get the 3 messages plus any more messages that come from the service.
  7. If you close the receiver without doing step 6, then you lose those 3 messages as they are deleted in the service.
  8. General use case is that one receiver is used for multiple receive requests. If you want to use one receiver per receive call, you better switch to PEEKLOCK mode or use receive() instead of receiveBatch().
ganeshpft commented 4 years ago

Is there any way to read the messages from the closed receiver? or before closing the connection can we able to check any messages available in the receiver?

One receiver per receive call, with PEEKLOCK mode also wont solve the receiveBatch problem. (n - Batch Size, x - Received Message Count) In receiveBatch API call, x messages received and n-x messages will be in locked state because of connection closed. In this scenario, locked messages will be available after lock duration expired. And also there is a chance of moving to DLQ if max delivery count exceeds

yvgopal commented 4 years ago

No. You cannot read messages from a closed receiver. Agree with the second observation. Still one receiver for each receiveBatch() call is not a good idea.