Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.25k stars 1.93k forks source link

[BUG] Reactor IllegalStateException thrown when using StreamBridge with Spring Cloud Azure Stream Binder #35215

Open itsmariodias opened 1 year ago

itsmariodias commented 1 year ago

Describe the bug When using StreamBridge to publish messages to Azure Eventhubs in a reactive chain (.map, .flatMap, .doOnSuccess, etc.), an IllegalStateException is thrown by Reactor indicating that .block() call is used in the chain. The offending area is at line 106 in the com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.doSend method.

Exception or Stack Trace

2023-05-31 11:20:24.976 ERROR 8032 --- [ctor-http-nio-3] c.a.s.m.e.core.EventHubsTemplate         : EventDataBatch create error.

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
    at reactor.core.publisher.Mono.block(Mono.java:1742)
    at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.doSend(EventHubsTemplate.java:106)
    at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:60)
    at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:97)
    at com.azure.spring.integration.core.handler.DefaultMessageHandler.handleMessageInternal(DefaultMessageHandler.java:86)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:239)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:171)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:151)
    at com.sample.eventhubs.rest.EventhubsController.lambda$publishMessage$0(EventhubsController.java:27)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:192)
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
    at reactor.core.publisher.Operators.complete(Operators.java:137)
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:121)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
    at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1002)
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:707)
    at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:481)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:621)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:230)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

To Reproduce Steps to reproduce the behavior:

  1. Publish a message via StreamBridge.send in any reactive chain.

Code Snippet RestController.java

@PostMapping("/publishMessage")
public Mono<ResponseEntity<String>> publishMessage() {
    return eventhubsService.someMethod()
            .doOnSuccess(message -> {
                log.info("Sending message: {}", message);
                streamBridge.send("supply-out-0", message);
                log.info("Sent {}.", message);
            }).map(ResponseEntity::ok);
}

application.yaml

spring:
  cloud:
    azure:
      eventhubs:
        namespace: ${AZURE_EVENTHUBS_NAMESPACE}
    stream:
      output-bindings: supply
      bindings:
        supply-out-0:
          destination: ${AZURE_EVENTHUB_NAME}
      eventhubs:
        bindings:
          supply-out-0:
            producer:
              sync: true
      default:
        producer:
          errorChannelEnabled: true

Expected behavior The message should be published to eventhub without any errors.

Screenshots N/A

Setup (please complete the following information):

Additional context This seems to be identical to the #12500. When adding .publishOn(Schedulers.boundedElastic()) in the reactive chain before the streamBridge call the issue is resolved, however this ought to be done in the library itself.

Information Checklist

itsmariodias commented 1 year ago

Closed by mistake, reopened it now.

joshfree commented 1 year ago

Thank you for the detailed issue, @itsmariodias! @conniey @Azure/azsdk-sb-java can you please take a look?

anuchandy commented 1 year ago

Hi Yi, this is due to this line and this line in our spring layer, we may want to make the first batch creation as part of the async-chain.

anuchandy commented 1 year ago

@yiliuTo I was thinking something like this https://gist.github.com/anuchandy/9b46fb005e783f16fad38e04c1f7ed4a . I didn't test it but maybe useful as a reference or there exists a better way to do it.

itsmariodias commented 1 year ago

Is there any activity on this? The Additional Context provided should be useful in debugging and provide the correct fix for the same.

itsmariodias commented 9 months ago

Hi, this seems to still exist in v5.5.0, any timeline on when this would be fixed? @joshfree @anuchandy @conniey @yiliuTo

conniey commented 8 months ago

@yiliuTo Please feel free to redirect this if you do not own the event hubs spring binder.

AlanKrueger commented 2 weeks ago

This seems like the same issue as #31358. Is there a known workaround for this?