spring-projects / spring-ai

An Application Framework for AI Engineering
https://docs.spring.io/spring-ai/reference/index.html
Apache License 2.0
3.3k stars 846 forks source link

[BUG] [Azure OpenAI] block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2 #981

Closed nspyke closed 1 month ago

nspyke commented 4 months ago

Duplicate from Azure SDK as I'm not sure where the issue lies exactly as while the exception is triggered in the Azure Open AI SDK, using the SDK directly does not cause this issue.

Azure/azure-sdk-for-java #40898

Bug description In a Spring Boot WebFlux project, calling the stream method of Spring AI which then calls the Azure OpenAI SDK, it throws the exception block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

Exception or Stack Trace

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
    Suppressed: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.7.jar:3.6.7]
        at com.azure.core.http.netty.NettyAsyncHttpClient.sendSync(NettyAsyncHttpClient.java:198) ~[azure-core-http-netty-1.15.0.jar:1.15.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.HttpLoggingPolicy.processSync(HttpLoggingPolicy.java:175) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.policy.InstrumentationPolicy.processSync(InstrumentationPolicy.java:101) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.KeyCredentialPolicy.processSync(KeyCredentialPolicy.java:115) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.CookiePolicy.processSync(CookiePolicy.java:73) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.AddDatePolicy.processSync(AddDatePolicy.java:50) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.RetryPolicy.attemptSync(RetryPolicy.java:211) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.RetryPolicy.processSync(RetryPolicy.java:161) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.AddHeadersFromContextPolicy.processSync(AddHeadersFromContextPolicy.java:67) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.RequestIdPolicy.processSync(RequestIdPolicy.java:77) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.HttpPipelineSyncPolicy.processSync(HttpPipelineSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.policy.UserAgentPolicy.processSync(UserAgentPolicy.java:174) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:138) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.rest.SyncRestProxy.send(SyncRestProxy.java:62) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:83) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.implementation.http.rest.RestProxyBase.invoke(RestProxyBase.java:125) ~[azure-core-1.49.0.jar:1.49.0]
        at com.azure.core.http.rest.RestProxy.invoke(RestProxy.java:97) ~[azure-core-1.49.0.jar:1.49.0]
        at jdk.proxy2/jdk.proxy2.$Proxy55.getChatCompletionsSync(Unknown Source) ~[na:na]
        at com.azure.ai.openai.implementation.OpenAIClientImpl.getChatCompletionsWithResponse(OpenAIClientImpl.java:1444) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
        at com.azure.ai.openai.OpenAIClient.getChatCompletionsWithResponse(OpenAIClient.java:318) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
        at com.azure.ai.openai.OpenAIClient.getChatCompletionsStream(OpenAIClient.java:732) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
        at org.springframework.ai.azure.openai.AzureOpenAiChatModel.stream(AzureOpenAiChatModel.java:165) ~[spring-ai-azure-openai-1.0.0-M1.jar:1.0.0-M1]
        at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.doGetFluxChatResponse(ChatClient.java:692) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
        at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.chatResponse(ChatClient.java:707) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
        at com.example.aiplatform.conversations.api.resource.ChatResource.stream(ChatResource.java:30) ~[main/:na]
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198) ~[spring-webflux-6.1.10.jar:6.1.10]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.20.jar:1.1.20]
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.20.jar:1.1.20]
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:818) ~[reactor-netty-http-1.1.20.jar:1.1.20]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.20.jar:1.1.20]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:305) ~[reactor-netty-http-1.1.20.jar:1.1.20]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

To Reproduce

Environment Spring AI version 1.0.0-M1 Java version 21 Spring Boot Web Flux stater 3.3.1

Expected behavior Azure OpenAI chat streams a response

Minimal Complete Reproducible example Create a new Spring Boot 3.3.1 project through initializer, add Spring AI and Azure Open AI dependencies and configure them in application.yaml for autoconfiguration.

spring:
  application:
    name: ai-platform-conversations-api
  ai:
    azure:
      openai:
        chat:
          enabled: true
          options:
            deployment-name: gpt-35-turbo
        api-key: xxxxx
        endpoint: https://example.openai.azure.com/
        embedding:
          enabled: true
          options:
            deployment-name: text-embedding-ada-002
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
        return builder.build();
}
@RestController
public class ChatResource {
    private final ChatClient chatClient;

    public ChatResource(ChatClient chatClient) {
        this.chatClient = chatClient;
    }
    @PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> stream(@RequestBody String input) {
        return chatClient.prompt().user(input).stream().chatResponse();
    }
}
POST http://localhost:8080/stream
Content-Type: text/plain

Recite the first 6 paragraphs of war and peace
egosumcarlos commented 2 months ago

Does anyone know when this issue will be addressed?

sobychacko commented 1 month ago

@nspyke @egosumcarlos, could you confirm this issue can be reproduced on the latest snapshot? (1.0.0-SNAPSHOT). I see the issue on 1.0.0-M2 but not on the latest snapshot. Thanks!

nspyke commented 1 month ago

Hi @sobychacko Yes, I'm happy to confirm that this is fixed in the SNAPSHOT version!! :) Would it be possible to please tag a new version with this fix included? Maybe 1.0.0-M2.1 if M3 is still a while away. This bug is the only thing stopping me from adopting this amazing Spring package/library and I've been delaying the build of my OpenAI API endpoints while waiting for the fix.

egosumcarlos commented 1 month ago

Hi @nspyke, my pom.xml is this, and use the version 1.0.0-SNAPSHOT, but the issue persist, what is my error?:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.9</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ntt</groupId>
<artifactId>appgenai</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>appgenai</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
    <license/>
</licenses>
<developers>
    <developer/>
</developers>
<scm>
    <connection/>
    <developerConnection/>
    <tag/>
    <url/>
</scm>
<properties>
    <java.version>17</java.version>
    <spring-ai.version>1.0.0-SNAPSHOT</spring-ai.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-azure-openai-spring-boot-starter</artifactId>
    </dependency>
    <!--<dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-redis-store-spring-boot-starter</artifactId>
    </dependency>-->

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.ai</groupId>
            <artifactId>spring-ai-bom</artifactId>
            <version>${spring-ai.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>
<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/snapshot</url>
        <releases>
            <enabled>false</enabled>
        </releases>
    </repository>
</repositories>

Thanks!

sobychacko commented 1 month ago

@egosumcarlos, Can you ensure that you are using the 1.0.0-SNAPSHOT version of spring-ai? Maybe run a mvn depedency:tree?