Open mohyeid opened 7 months ago
Hi @mohyeid , thanks for reaching out. Could you help provide a minimal project, thanks?
Here you go: https://github.com/mohyeid/azure-sdk-processor-issue
Please, note tenantId, clientId, secret, proxy, etc.. are fake and for sample code only. You can debug the issue by putting these next breakpoints and noticing the differences:
AbstractServiceBusSubClientBuilderFactory:38 First run for sender & second run for reciever it uses the shared client Third run for processor, it creates a new instance. Breakpoints at:
Other helpful breakpoints: ReactorHandlerProvider:57
Hope this helps, let me know if I can provide any extra details.
I am not sure if I need to open another ticket, but this is 100% related. Due to not using the shared client, it can not set the proxy, custom-endpont, etc. But more I found also can not set or use the correct topic name provided in the configuration. Here is the error:
com.azure.messaging.servicebus.ServiceBusException: The messaging entity 'sb://msdtestmohy.servicebus.windows.net/consumerFunction-in-0' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:aad4b4a3-6d1f-4ea0-95f2-5cbb62c46ede_G14, SystemTracker:msdtestmohy.servicebus.windows.net:consumerFunction-in-0, Timestamp:2023-12-08T06:15:30, errorContext[NAMESPACE: msdtestmohy.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.mapError(ServiceBusReceiverAsyncClient.java:1790) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoError] :
Error has been observed at the following site(s):
*__Flux.onErrorMap ⇢ at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.receiveMessagesWithContext(ServiceBusReceiverAsyncClient.java:995)
Original Stack Trace:
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.mapError(ServiceBusReceiverAsyncClient.java:1790) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$receiveMessagesWithContext$21(ServiceBusReceiverAsyncClient.java:995) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:7070) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.doError(FluxPublishOn.java:511) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.checkTerminated(FluxPublishOn.java:549) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:432) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onError(FluxPublishOn.java:248) ~[reactor-core-3.4.29.jar:3.4.29]
at com.azure.messaging.servicebus.FluxAutoComplete$AutoCompleteSubscriber.hookOnError(FluxAutoComplete.java:120) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) ~[reactor-core-3.4.29.jar:3.4.29]
at com.azure.messaging.servicebus.FluxAutoLockRenew$LockRenewSubscriber.hookOnError(FluxAutoLockRenew.java:130) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) ~[reactor-core-3.4.29.jar:3.4.29]
at com.azure.messaging.servicebus.FluxTrace$TracingSubscriber.hookOnError(FluxTrace.java:81) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.4.29.jar:3.4.29]
at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.onError(ServiceBusReceiveLinkProcessor.java:370) ~[azure-messaging-servicebus-7.14.5.jar:7.14.5]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2065) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.Mono.subscribe(Mono.java:4490) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:56) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.Mono.subscribe(Mono.java:4490) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoWhen$WhenCoordinator.signalError(MonoWhen.java:172) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoWhen$WhenInner.onError(MonoWhen.java:288) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2065) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:198) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172) ~[reactor-core-3.4.29.jar:3.4.29]
at com.azure.core.amqp.implementation.RequestResponseChannel.settleMessage(RequestResponseChannel.java:416) ~[azure-core-amqp-2.8.11.jar:2.8.11]
at com.azure.core.amqp.implementation.RequestResponseChannel.lambda$new$0(RequestResponseChannel.java:189) ~[azure-core-amqp-2.8.11.jar:2.8.11]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.4.29.jar:3.4.29]
at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:173) ~[azure-core-amqp-2.8.11.jar:2.8.11]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:na]
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.8.11.jar:2.8.11]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.29.jar:3.4.29]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.29.jar:3.4.29]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity 'sb://msdtestmohy.servicebus.windows.net/consumerFunction-in-0' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:aad4b4a3-6d1f-4ea0-95f2-5cbb62c46ede_G14, SystemTracker:msdtestmohy.servicebus.windows.net:consumerFunction-in-0, Timestamp:2023-12-08T06:15:30, errorContext[NAMESPACE: msdtestmohy.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]
at com.azure.core.amqp.implementation.ExceptionUtil.distinguishNotFound(ExceptionUtil.java:123) ~[azure-core-amqp-2.8.11.jar:2.8.11]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoHandleFuseable] :
reactor.core.publisher.Mono.handle(Mono.java:3248)
com.azure.core.amqp.implementation.ClaimsBasedSecurityChannel.lambda$authorize$2(ClaimsBasedSecurityChannel.java:68)
Describe the bug Today when spring boot/ spring cloud auto configuration, sub & producer factories are able to reuse the preconfigured serviceBuSClientBuilder and mark
shareServiceBusClientBuilder
totrue
like here:https://github.com/Azure/azure-sdk-for-java/blob/2b020724483dbb745531c113d78c4d900fcb52e4/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactory.java#L55
In the other hand the
DefaultServiceBusNamespaceProcessorFactory
creates `` without passing any serviceBusClientBuilder instance, which result on creating a new instance that lacks any type of configurations that was done for the sub & producers. It will lack any configurations like proxy, custom-endpoint-address customization (reference issue: https://github.com/Azure/azure-sdk-for-java/issues/37930#issue-2026925790)https://github.com/Azure/azure-sdk-for-java/blob/2b020724483dbb745531c113d78c4d900fcb52e4/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProcessorFactory.java#L189C40-L189C40
This line will call ServiceBusProcessorClientBuilderFactory without serviceBuSClientBuilder , which will result on calling another constructor passing null for serviceBuSClientBuilder and result on a new instance lacking any type of configurations.
Exception or Stack Trace Lack of configurations will result on an exception when creating the processor connection despite the sub & producer connections being created successfully.
To Reproduce Steps provided above. This is for spring cloud stream auto aonfigurations.
Code Snippet Added above
Expected behavior Proccessor factory need to be able to use the preconfigured serviceBuSClientBuilder similar to the sub & producer factories.
Setup (please complete the following information):
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report