Azure / azure-service-bus-java

☁️ Java client library for Azure Service Bus
https://azure.microsoft.com/services/service-bus
MIT License
60 stars 59 forks source link

Error processing ASB queues with Spring Boot and Apache Camel #423

Closed SofiaVynnytska12 closed 2 years ago

SofiaVynnytska12 commented 2 years ago

We use Spring Boot (v.3.7.1), apache camel (v.3.18.1), camel-azure-servicebus-starter (v.3.18.1) ; The connection string is correct and has valid format; Queues are created like this : azure-servicebus:queueName?connectionString=ourConnectionString;

While trying to create camel routes to azure service bus queues we are receiving an exception:

[org.apache.camel.component.azure.servicebus.ServiceBusConsumer:214] (reactor-executor-4) Error processing exchange. Exchange[]. Caused by: [com.azure.messaging.servicebus.ServiceBusException - status-code: 401, status-description: InvalidSignature: The token has an invalid signature., errorContext[NAMESPACE: ournamespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]] com.azure.messaging.servicebus.ServiceBusException: status-code: 401, status-description: InvalidSignature: The token has an invalid signature., errorContext[NAMESPACE: ournamespace.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0] at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.mapError(ServiceBusReceiverAsyncClient.java:1571) at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$receiveMessagesWithContext$18(ServiceBusReceiverAsyncClient.java:817) at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:6908) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) at com.azure.messaging.servicebus.FluxAutoComplete$AutoCompleteSubscriber.hookOnError(FluxAutoComplete.java:117) at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) at com.azure.messaging.servicebus.FluxAutoLockRenew$LockRenewSubscriber.hookOnError(FluxAutoLockRenew.java:124) at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.onError(ServiceBusReceiveLinkProcessor.java:358) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225) at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251) at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537) at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343) at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) at reactor.core.publisher.Operators.error(Operators.java:198) at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) at reactor.core.publisher.Mono.subscribe(Mono.java:4397) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234) at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) at reactor.core.publisher.Operators.error(Operators.java:198) at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:56) at reactor.core.publisher.Mono.subscribe(Mono.java:4397) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) at reactor.core.publisher.MonoWhen$WhenCoordinator.signalError(MonoWhen.java:172) at reactor.core.publisher.MonoWhen$WhenInner.onError(MonoWhen.java:288) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:210) at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:132) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172) at com.azure.core.amqp.implementation.RequestResponseChannel.settleMessage(RequestResponseChannel.java:398) at com.azure.core.amqp.implementation.RequestResponseChannel.lambda$new$0(RequestResponseChannel.java:180) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537) at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343) at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:164) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: com.azure.core.amqp.exception.AmqpException: status-code: 401, status-description: InvalidSignature: The token has an invalid signature., errorContext[NAMESPACE: ournamespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0] at com.azure.core.amqp.implementation.ExceptionUtil.amqpResponseCodeToException(ExceptionUtil.java:114) at com.azure.core.amqp.implementation.ClaimsBasedSecurityChannel.lambda$authorize$0(ClaimsBasedSecurityChannel.java:74) at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:110)