Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.36k stars 2k forks source link

[BUG] [Azure OpenAI] block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2 #40898

Open nspyke opened 5 months ago

nspyke commented 5 months ago

Describe the bug In a Spring Boot WebFlux project, calling the stream method of Spring AI which then calls the Azure OpenAI SDK, it throws the exception block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

Exception or Stack Trace

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
    Suppressed: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.7.jar:3.6.7]
        at com.azure.core.http.netty.NettyAsyncHttpClient.sendSync(NettyAsyncHttpClient.java:198) ~[azure-core-http-netty-1.15.0.jar:1.15.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.HttpLoggingPolicy.processSync(HttpLoggingPolicy.java:175) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.policy.InstrumentationPolicy.processSync(InstrumentationPolicy.java:101) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.KeyCredentialPolicy.processSync(KeyCredentialPolicy.java:115) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.CookiePolicy.processSync(CookiePolicy.java:73) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.AddDatePolicy.processSync(AddDatePolicy.java:50) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.RetryPolicy.attemptSync(RetryPolicy.java:211) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.RetryPolicy.processSync(RetryPolicy.java:161) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.AddHeadersFromContextPolicy.processSync(AddHeadersFromContextPolicy.java:67) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.RequestIdPolicy.processSync(RequestIdPolicy.java:77) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.HttpPipelineSyncPolicy.processSync(HttpPipelineSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.UserAgentPolicy.processSync(UserAgentPolicy.java:174) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:138) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.rest.SyncRestProxy.send(SyncRestProxy.java:62) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:83) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.rest.RestProxyBase.invoke(RestProxyBase.java:125) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.rest.RestProxy.invoke(RestProxy.java:97) ~[azure-core-1.49.0.jar:1.49.0]
        at jdk.proxy2/jdk.proxy2.$Proxy55.getChatCompletionsSync(Unknown Source) ~[na:na]
        at com.azure.ai.openai.implementation.OpenAIClientImpl.getChatCompletionsWithResponse(OpenAIClientImpl.java:1444) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
        at com.azure.ai.openai.OpenAIClient.getChatCompletionsWithResponse(OpenAIClient.java:318) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
        at com.azure.ai.openai.OpenAIClient.getChatCompletionsStream(OpenAIClient.java:732) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
        at org.springframework.ai.azure.openai.AzureOpenAiChatModel.stream(AzureOpenAiChatModel.java:165) ~[spring-ai-azure-openai-1.0.0-M1.jar:1.0.0-M1]
        at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.doGetFluxChatResponse(ChatClient.java:692) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
        at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.chatResponse(ChatClient.java:707) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
        at nz.co.airnz.aiplatform.conversations.api.resource.ChatResource.stream(ChatResource.java:30) ~[main/:na]
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198) ~[spring-webflux-6.1.10.jar:6.1.10]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.20.jar:1.1.20]
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.20.jar:1.1.20]
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:818) ~[reactor-netty-http-1.1.20.jar:1.1.20]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.20.jar:1.1.20]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:305) ~[reactor-netty-http-1.1.20.jar:1.1.20]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

To Reproduce Create a new Spring Boot 3.3.1 project through initializer, add Spring AI and Azure Open AI dependencies and configure them in application.properties for autoconfiguration.

Code Snippet

@RestController
public class ChatResource {
    private final ChatClient chatClient;

    public ChatResource(ChatClient chatClient) {
        this.chatClient = chatClient;
    }
    @PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> stream(@RequestBody String input) {
        return chatClient.prompt().user(input).stream().chatResponse();
    }
}

Expected behavior Azure OpenAI chat streams a response

Setup (please complete the following information):

Additional context Add any other context about the problem here.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

joshfree commented 5 months ago

@mssfang could you please assist @nspyke

nspyke commented 5 months ago

Hi @joshfree, thanks for picking this up so quickly.

I have done a bit more investigation, this time using the Azure OpenAI SDK directly and removing the Spring AI library. I've found that the stream does work now, even though the stack trace makes it look like it the issue is coming from deep inside the Azure library.

@Bean
public OpenAIAsyncClient chatClient() {
    return new OpenAIClientBuilder()
            .credential(new AzureKeyCredential("my-az-open-ai-key"))
            .endpoint("https://my-endpoint.openai.azure.com")
            .buildAsyncClient();
}

And this code in the controller

@PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatCompletions> stream(@RequestBody String input) {
    List<ChatRequestMessage> chatMessages = new ArrayList<>();
    chatMessages.add(new ChatRequestSystemMessage("You are a helpful assistant. You will talk like a pirate."));
    chatMessages.add(new ChatRequestUserMessage(input));

    return chatClient.getChatCompletionsStream(
            "gpt-4",
            new ChatCompletionsOptions(chatMessages));
}
nspyke commented 5 months ago

Back into Spring AI and stepping through with the debugger, it calls com.azure.ai.openai.implementation.OpenAIClientImpl#getChatCompletionsWithResponse which calls the interface stub com.azure.ai.openai.implementation.OpenAIClientImpl.OpenAIClientService#getChatCompletionsSync

Response<BinaryData> getChatCompletionsSync(@HostParam("endpoint") String var1, @QueryParam("api-version") String var2, @PathParam("deploymentId") String var3, @HeaderParam("accept") String var4, @BodyParam("application/json") BinaryData var5, RequestOptions var6, Context var7);

This looks like a potential cause of the blocking issue.

I would have expected it to call com.azure.ai.openai.implementation.OpenAIClientImpl#getChatCompletionsWithResponseAsync and com.azure.ai.openai.implementation.OpenAIClientImpl.OpenAIClientService#getChatCompletions with the Mono<> return type.

nspyke commented 5 months ago

Duplicated this issue in Spring AI