Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[BUG] ServiceBus SDK constantly fails on complete() action with invalid lock, even for no-op subscribers #20640

Closed jfurmankiewiczpros closed 3 years ago

jfurmankiewiczpros commented 3 years ago

Describe the bug

We ported to the new SDK and are running some long-running perf tests to see how it is doing. Our logs are flooded with constant exceptions like the one below when we call message context complete()


com.azure.messaging.servicebus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:ae4628ac-df69-4ae3-a0e0-2ad31ced38c1, TrackingId:00955d4d0002000600000943607636a0_G5_B45, SystemTracker:mq4az-gocd-pipeline:Topic:mq4az_perftest_standard|perm_default, Timestamp:2021-04-14T00:32:10, errorContext[NAMESPACE: mq4az-gocd-pipeline.servicebus.windows.net, PATH: **************/subscriptions/perm_default, REFERENCE_ID: mq4az_perftest_standard/subscriptions/perm_default_cbff0a_1618359968251, LINK_CREDIT: 0]
    at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$43(ServiceBusReceiverAsyncClient.java:1155)
    at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3384)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1862)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:243)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
    at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.completeWorkItem(ServiceBusReactorReceiver.java:367)
    at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.updateOutcome(ServiceBusReactorReceiver.java:303)
    at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.decodeDelivery(ServiceBusReactorReceiver.java:215)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    at reactor.core.publisher.SinkManyBestEffort$DirectInner.directEmitNext(SinkManyBestEffort.java:356)
    at reactor.core.publisher.DirectProcessor.tryEmitNext(DirectProcessor.java:233)
    at reactor.core.publisher.DirectProcessor.emitNext(DirectProcessor.java:192)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:217)
    at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
    at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:125)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:82)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Mono.block(Mono.java:1703)
        at com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.complete(ServiceBusReceivedMessageContext.java:56)
        at com.pros.commons.mq4az.servicebus.SubscriptionMessageTypeHandler.lambda$accept$2(SubscriptionMessageTypeHandler.java:79)
        at com.pros.commons.mq4az.servicebus.SubscriptionMessageTypeHandler.execSafely(SubscriptionMessageTypeHandler.java:97)
        at com.pros.commons.mq4az.servicebus.SubscriptionMessageTypeHandler.accept(SubscriptionMessageTypeHandler.java:79)
        at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:219)
        at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:196)
        at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
        at reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.drainLoop(ParallelMergeSequential.java:278)
        at reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.onNext(ParallelMergeSequential.java:211)
        at reactor.core.publisher.ParallelMergeSequential$MergeSequentialInner.onNext(ParallelMergeSequential.java:400)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        ... 5 more

We create the processor in a standard way:

this.subscriptionProcessor =
            sharedConnectionBuilder
                .processor()
                .topicName(configuration.getTopic())
                .subscriptionName(sub.getSubscriptionName())
                .prefetchCount(160)
                .maxConcurrentCalls(160)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .processMessage(subscriptionMessageTypeHandler::accept)
                .processError(new MessageProcessorErrorHandler())
                .disableAutoComplete() // we do manual error handling
                .buildProcessorClient();

Notice there is no option in the configuration to specify message lock. We presume it uses some built-in default we can't override.

Our actual subscribe processing logic is basically a no-op to verify we got the message and just logs a statement to a log, nothing else. So the messaging processing is basically instant.

And yet our logs are flooded with thousands of such exceptions that the lock is expired.

Setup (please complete the following information):

jfurmankiewiczpros commented 3 years ago

We run another test, with prefetchCount = 0 and maxConcurrentCalls = 8 (the # of cores on the server) and we got less of these at first, but after about 30 minutes we started to see them regularly in the logs again

com.azure.messaging.servicebus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:cd18804a-8364-4c11-a321-48aba7c7c088, TrackingId:8871df5a000200060000094860763f60_G1_B45, SystemTracker:mq4az-gocd-pipeline:Topic:mq4az_perftest_standard|perm_default, Timestamp:2021-04-14T02:55:11, errorContext[NAMESPACE: mq4az-gocd-pipeline.servicebus.windows.net, PATH: mq4az_perftest_standard/subscriptions/perm_default, REFERENCE_ID: mq4az_perftest_standard/subscriptions/perm_default_86bf05_1618362207561, LINK_CREDIT: 0]
    at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$43(ServiceBusReceiverAsyncClient.java:1155)
    at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3384)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1862)

considering the prefetch count of 0, something seems wrong here.

It looks as if the default lock duration applied by a processor is insufficient and there is no way to override it when building the processor client.

jfurmankiewiczpros commented 3 years ago

after about 45 min of running with prefetchCount = 0 and maxConcurrentCalls = 8, now our logs are flooded with the same lock exception. So it took longer then were running with prefetchCount / maxConcurrentCalls of 160, but now it's in the same state. Just logs overflowing with the same exception over and over.

joshfree commented 3 years ago

Thanks for reporting this @jfurmankiewiczpros. @YijunXieMS could you please investigate?

/cc @hemanttanwar

jfurmankiewiczpros commented 3 years ago

hi all. I managed to capture the log of a full run of this to reproduce it. Attaching the log

Here's how the test was run:

It was all slowly chugging along till around 34K messages.

Suddenly it got an invalid lock exception on line 183053 of the attached log and after that that exception is just appearing over and over again, flooding the logs.

Hope this helps. Right now we are holding off with adopting the new SDK as we have not been able to complete a single successful test run with it, so it's a blocker for us.

log.zip

jfurmankiewiczpros commented 3 years ago

also forgot to mention we set the following property on the processor, taking advantage of the addition of this API in 7.3.0.beta.1

.maxAutoLockRenewDuration(Duration.ofMinutes(5)) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .prefetchCount(0) .maxConcurrentCalls(8)

jfurmankiewiczpros commented 3 years ago

hi, any updates on this? it is a critical showstopper for us, we cannot move ahead with adopting the SDK.

YijunXieMS commented 3 years ago

Hi @jfurmankiewiczpros I'm working on this. Target to release by 05/14. Does this meet your timeline?

jfurmankiewiczpros commented 3 years ago

I think we can live with that, if you can have some early alpha builds that we could test with earlier that would be greatly appreciated, thank you

YijunXieMS commented 3 years ago

@jfurmankiewiczpros, Appreciate you for providing the opportunity to help test the alpha build. Will keep you updated.

YijunXieMS commented 3 years ago

@jfurmankiewiczpros It's possible that the message is in the cache for too long. There is no auto lock renew yet when a message is in cache. Prefetch is 0 so this shouldn't happen in theory. I will do long running test. Meanwhile, is it possible you re-run the test with "debug" level logs and timestamp for log entries? That will give us more information.

jfurmankiewiczpros commented 3 years ago

let me see, this test suite is a simple Java class with main(), so logging is just basic console, will try to enable all of that and re-run the test

YijunXieMS commented 3 years ago

@jfurmankiewiczpros pls forget about the debug log. I already have it.

YijunXieMS commented 3 years ago

@jfurmankiewiczpros We released azure-messaging-servicebus 7.2.1. My long running test didn't have this problem any more. .maxAutoLockRenewDuration(Duration.ofMinutes(5) isn't available yet in 7.2.1 because it's still in the beta version. But 7.2.1 has default autolock for 5 minutes. So I suggest you use 7.2.1. Could you try it? Let me know if any more problems.

jfurmankiewiczpros commented 3 years ago

absolutely, let me try that tomorrow and let you know

YijunXieMS commented 3 years ago

@jfurmankiewiczpros have you tried it yet?

jfurmankiewiczpros commented 3 years ago

yes, THAT issue seems to go away, but I am still not able to successfully run my long running tests. Fetching messages 1 at a time is very slow, if I increase prefetchCount I start getting some management node errors after a few minutes and everything seems to grind to a halt. I will probably open a separate issue for that once we can isolate it more.

But I am OK to close this particular issue as solved, seems to have gone away with latest SDK

YijunXieMS commented 3 years ago

Thanks for the update. I'll look into that.