Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.31k stars 1.97k forks source link

[BUG] azure service bus sending messages with expired locks #37497

Closed s17b2-voroneckij closed 10 months ago

s17b2-voroneckij commented 10 months ago

Describe the bug I am running an app that listens to ServiceBus in Dubai, UAE, but the message broker is located in Europe. When there is a very big message in the queue (17-25 Mb), its delivery takes so long (more than a minute, it appears) that by the time azure service bus client gives me the message, the message lock has already expired, thus renewing it is not possible. As a result, I am not able to complete the message, and it is eventually dead-lettered.

Exception or Stack Trace com.azure.messaging.servicebus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://gbr01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2FServiceBusExceptions&data=05%7C01%7Cdmitry.voronetskiy%40cobblestoneenergy.com%7C8213b8f38a9d4f63596a08dbdc5d8865%7C373151db911e4a0fa09da332f520b9c0%7C0%7C0%7C638346066500788960%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=GvLW3zjdCkmYjP2su3gAwFsN2%2FuDdd6YKp%2BL%2F3oxf1E%3D&reserved=0 . Reference:fe5fcebf-336e-4cd0-8812-2210b6780a41, TrackingId:45249ec100001ba300022d566544d447_G12_B36, SystemTracker:G12:19867276:iris.5a6685ec-9c0b-431c-bb1a-3ae9c6ac97bc_7bdd80_1699009603347, Timestamp:2023-11-03T11:10:46, errorContext[NAMESPACE: elexon-iris.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: iris.5a6685ec-9c0b-431c-bb1a-3ae9c6ac97bc, REFERENCE_ID: iris.5a6685ec-9c0b-431c-bb1a-3ae9c6ac97bc_7bdd80_1699009603347, LINK_CREDIT: 0] at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$50(ServiceBusReceiverAsyncClient.java:1602) at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) at reactor.core.publisher.Operators.error(Operators.java:198) at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) at reactor.core.publisher.Mono.subscribe(Mono.java:4490) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1886) at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:340) at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onError(MonoCacheTime.java:363) at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201) at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork.onComplete(ReceiverUnsettledDeliveries.java:736) at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries.completeDispositionWorkWithSettle(ReceiverUnsettledDeliveries.java:528) at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries.handleRetriableRejectedRemoteOutcome(ReceiverUnsettledDeliveries.java:389) at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries.onDispositionAck(ReceiverUnsettledDeliveries.java:218) at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.decodeDelivery(ServiceBusReactorReceiver.java:165) at com.azure.core.amqp.implementation.ReactorReceiver.lambda$new$0(ReactorReceiver.java:114) at com.azure.core.amqp.implementation.handler.DispatchHandler.onTimerTask(DispatchHandler.java:32) at com.azure.core.amqp.implementation.ReactorDispatcher$WorkScheduler.run(ReactorDispatcher.java:207) at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118) at org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61) at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:390) at org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:100) at reactor.core.publisher.Mono.block(Mono.java:1742) at com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.complete(ServiceBusReceivedMessageContext.java:81) at asb2jms.ASBConsumer.lambda$getMessageProcessor$1(ASBConsumer.java:146) at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:430) at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:406) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237) at com.azure.messaging.servicebus.FluxAutoLockRenew$LockRenewSubscriber.hookOnNext(FluxAutoLockRenew.java:184) at com.azure.messaging.servicebus.FluxAutoLockRenew$LockRenewSubscriber.hookOnNext(FluxAutoLockRenew.java:82) at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160) at com.azure.messaging.servicebus.FluxTrace$TracingSubscriber.hookOnNext(FluxTrace.java:67) at com.azure.messaging.servicebus.FluxTrace$TracingSubscriber.hookOnNext(FluxTrace.java:35) at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.drainQueue(ServiceBusReceiveLinkProcessor.java:518) at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.drain(ServiceBusReceiveLinkProcessor.java:477) at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.lambda$onNext$2(ServiceBusReceiveLinkProcessor.java:229) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ... 5 more Caused by: com.azure.core.amqp.exception.AmqpException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue.

To Reproduce Connect to an ASB broker in a remote location and receive very large messages in PEEK_LOCK mode.

Code Snippet

ServiceBusProcessorClient processor;
        processor = new ServiceBusClientBuilder()
                .credential(credential)
                .fullyQualifiedNamespace(Conf.FROM.NAMESPACE)
                .processor()
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .queueName(Conf.FROM.QUEUE_NAME)
                .prefetchCount(1)
                .disableAutoComplete()
                .processError(errorContext -> {
                    log.error(ExceptionUtils.getStackTrace(errorContext.getException()));
                })
                .processMessage(messageContext -> {
                    var message = messageContext.getMessage();
                    log.info("consumer -> received message {} of type {}", message.getMessageId(), message.getSubject());
                    log.info("consumer -> lock of message {} expiring at {}", message.getMessageId(), message.getLockedUntil());
                    try {
                   // some processing here, takes ~50 ms
                            messageContext.complete();

                    } catch (InterruptedException | TimeoutException e) {
                        log.warn("interrupted | timeout in servicebus processor");
                        messageContext.abandon();
                    } catch (Exception e) {
                        log.error("processing message {} of type {} failed", message.getMessageId(), message.getSubject());
                        log.error(ExceptionUtils.getStackTrace(e));
                    }
                })
                .buildProcessorClient();

Expected behavior ASB should either allow renewing locks after they expire or transport messages more effectively. ASB SDK shouldnt attempt to renew a lock that has already expired, rather it should report that the message cant be completed   Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

Additional context The message broker in question is the public broker of Elexon`s IRIS.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

anuchandy commented 10 months ago

Hello, I will follow up with you shortly.

anuchandy commented 10 months ago

Considering the size of the message and where the broker (Service Bus) and client are hosted, you want to bump up the default lock duration - e.g. via portal.azure.com, the default can be up to 5 minutes. This is the maximum duration that broker locks the message once it sends message to wire.

It’s possible that by the time the message arrived and assembled at the client, the lock at broker is expired. The client cannot request extending the lock if it receives a message with already expired lock.

s17b2-voroneckij commented 10 months ago

@anuchandy , thanks for your answer. Unfortunately, this doesn`t fully solve the problem:

  1. It doesnt make sense that transferring 25 MBs of data takes more than a minute. Given that Ive been running consumers on cloud, there shouldn`t be such slow download speed.
  2. Unfortunately, in this case the ASB broker is provided by a third party, and I can`t set lock timeout myself.
  3. Even if I were able to do that, the problem would still appear if the message was 5 times larger.
  4. Also, if receiving one message takes more than a minute, that means that the delivery of other messages would be delayed, which is not desirable.
  5. I can see in the logs that the ASB client library tries renewing the expired lock and fails - it is a senseless operation that could be removed.
  6. When I was debugging this issue in Python it appeared to me that once the lock expires and I request messages again, the client takes more than a minute again to get the very same message (correct me if I am wrong) - this doesn`t seem effective, as what is the point of transferring the message again if it has already been done?

Also, do you happen to know whether I would face this problem if I used JMS to connect to Service Bus instead of this SDK?

anuchandy commented 10 months ago

I tested deploying the Consumer (in a VM) and Service Bus in two cross-region cases –

  1. Cross-Regions-Pair-A: (Iowa/EastUS, Singapore/SouthEastAsia)
  2. Cross-Regions-Pair-B: (London/UKSouth, UAE North)

The great circle distance wise, the regions in Cross-Regions-Pair-A are farther apart than regions in Cross-Regions-Pair-B. However, in the testing, receiving 20 MB message in Cross-Regions-Pair-A case takes ~3 seconds on average. But, in Cross-Regions-Pair-B case, it takes on average ~1 minute, the same observation as yours. I can also see that in Cross-Regions-Pair-B, the tracert (ran in the shell) to trace DNS in the other region timeout often but for Cross-Regions-Pair-A, it was able to trace the hops.

At this point it’s very certain that this is a networking latency, which is manifested as slowness in the libraries (Python and Java). Observing the same delay irrespective of the libraries (Python, Java) is a data point supporting this.

While these are the observations, officially root cause analyzing the latency of network or infra is outside the purview of library team. I recommend you open a support ticket by referring to the VM in UAE Azure Data Center in your Subscription. By opening a ticket, the Support Engineer owing the ticket will coordinate with backend teams (e.g., compute/network/service-bus) to RCA any latency in the infra they manage.

Answering your questions -

Regarding the first 4 questions – as we can see those concerns are due to the side effect of the external network latency.

Regarding the 5th question – we are improving the lock renewal logic, so there is less of that log. But, in general, the libraries will rely on broker (Service bus) to decide if lock renewal can be done, as this keeps the client implementation simple and stable (avoid dealing with clock skew etc.), and the renewal request is a small payload and is cheap as the underlying connection is already established.

Regarding the 6th question – this is expected in PEEK_LOCK mode, where the Service Bus broker is expected to redeliver the message until the message is completed, or delivery attempts exceeds the max-delivery-count. So, this is the expected behavior of Python SDK.

I don’t have enough insight into JMS Client or its underlying protocol. But I would expect the networking latency impacting any clients.

s17b2-voroneckij commented 10 months ago

@anuchandy, thanks for your effort to look into this issue. I really appreciate this. In case anyone comes across this issue, I will add some other observations I made on this:

  1. In case JMS is used, the same problem exists, but it manifests slightly differently. Receiving a large message still takes a long time and the consumer isn't able to remove it from the queue, but somehow it doesn't get any exceptions. I don't know, why.
  2. Doing some tests with the Python SDK, I observed that if I set transport_type to "AMQP over websockets", then message delivery is significantly faster. I was able to receive and complete a message of size ~85 Mb from UK South in UAE within a minute. I hope this will solve the issue I am facing.