quarkiverse / quarkus-langchain4j

Quarkus Langchain4j extension
https://docs.quarkiverse.io/quarkus-langchain4j/dev/index.html
Apache License 2.0
119 stars 65 forks source link

RegisterAiService is locked down to ChatLanguageModel #105

Open vietk opened 7 months ago

vietk commented 7 months ago

Hello,

Currently the RegisterAiService cannot find or be configured to use a StreamingChatLanguageModel, it's locked to a "Blocking"ChatLanguageModel It would be nice to have streaming here to write chat bots with provider that support streaming of response.

Regards

geoand commented 7 months ago

Makes sense!

geoand commented 6 months ago

Actually, this would only make sense if the response of the service is an async type of String

vietk commented 6 months ago

Yes, and apparenlty the underlying langchain API, TokenStream, handles only Consumer<String>

On my side I have made a trial with OpenAI client where in the code you can hack the TokenStream return method.

    interface Bot {
      @SystemMessage("""
          xxx
          """)
      TokenStream chat(@UserMessage String question);
    }

If you add the support for Async methods (CompletableFuture/Mutiny) types there it would do the trick

Then I need to link it with AIService using manual registration and it worked well.

return AiServices.builder(Bot.class).streamingChatLanguageModel(model).retriever(retriever).build();

At the end, a wrapper class sends back Multi<String> instead of TokenStream ( that is not very friendly API)

vietk commented 6 months ago

By the way, Are you planning to add this feature soon? I could try to contribute it, just give me your status on that

geoand commented 6 months ago

You can certainly take a swing at it if you like! I have not started looking into

andreadimaio commented 4 months ago

I'm implementing the StreamingChatLanguageModel for the bam and watsonx module and I noticed the problem of "locking" on the ChatLanguageModel.

Is there any news on this issue? I could help to "unlock" it 😄

vietk commented 4 months ago

Hello I have something in the middle :D but haven't time to finish it yet So to me, feel free to contribute

andreadimaio commented 4 months ago

At first glance at the code, the "unlock" operation should be simple. What is missing is the addInjectionPoint of the StreamingChatModel into the AiServicesProcessor and also adding the injected instance into the AiServicesRecorder class (quarkusAiServices.streamingChatLanguageModel).

I need to think if it is possible to avoid using the TokenStream class. Any suggestions are appreciated.

andreadimaio commented 4 months ago

I'm going to share with you a draft version of the code to unlock the StreamingChatModel and allow the user to use Multi<String> instead of TokenStream. The idea is to share with you what I've done so far to understand if I'm on the right track or if I need to do a big rollback 💥.

You can find the commit in my repository https://github.com/andreadimaio/quarkus-langchain4j. First thing to explain... you will find a lot of changes in different classes, this is because I need a fine control to choose what to enable/disable in the recorder classes. In detail, I changed the .enableIntegration() to .enableChatModel(), .enableStreamingChatModel() and .enableEmbeddingModel.

Now let's focus on the major changes in the core project. The first changes are in AiServicesProcessor.java where I added the addInjectionPoint for the StreamingChatModel class and also in AiServicesRecorder.java where I updated the context, this enables the StreamingChatModel.

To add the possibility of using the Multi<String> what I have done is to wrap a TokenStream as you can see in the AiServiceMethodImplementationSupport.java

if (returnType.equals(Multi.class)) {
            return Multi.createFrom().emitter(new Consumer<MultiEmitter<? super String>>() {

                @Override
                public void accept(MultiEmitter<? super String> em) {
                    new AiServiceTokenStream(messages, context, memoryId)
                        .onNext(em::emit)
                        .onComplete(s -> em.complete())
                        .onError(em::fail)
                        .start();
                }
            });
        }

I'm testing these changes with the bam module and everything seems fine except for some unusual behavior that I'd like to share with you.

1) Tests I have execute on my local environment the junit tests, is all green except for the simple-ollama.

[ERROR] Failed to execute goal io.quarkus:quarkus-maven-plugin:3.7.3:build (default) on project quarkus-langchain4j-integration-test-simple-ollama: Failed to build quarkus application: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[ERROR]         [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
[ERROR]         - synthetic injection point
[ERROR]         - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("org.acme.Assistant")], target=n/a]
[ERROR]         at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
[ERROR]         at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
[ERROR]         at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
[ERROR]         at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
[ERROR]         at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
[ERROR]         at java.base/java.lang.reflect.Method.invoke(Method.java:580)
[ERROR]         at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
[ERROR]         at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
[ERROR]         at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
[ERROR]         at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
[ERROR]         at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
[ERROR]         at java.base/java.lang.Thread.run(Thread.java:1583)
[ERROR]         at org.jboss.threads.JBossThread.run(JBossThread.java:501)
[ERROR] Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
[ERROR]         - synthetic injection point
[ERROR]         - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("org.acme.Assistant")], target=n/a]
[ERROR]         at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:518)
[ERROR]         at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
[ERROR]         at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
[ERROR]         ... 11 more
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :quarkus-langchain4j-integration-test-simple-ollama

I'm investigating why I'm getting this error, the .enableStreamingChatModel() should fix it, but it seems this change doesn't work for DevServices.

2) Multi with QuarkusRestClientBuilder To enable the StreamingChatModel for the bam module I'm using the Multi<String> as a response in the BamRestApi class, and when I use this kind of response type the QuarkusJsonCodecFactory.SnakeCaseObjectMapperHolder.MAPPER doesn't work, so I have to force it with the @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) at the class level.

3) In the project that I'm using as test (git clone https://github.com/andreadimaio/multistreamtest.git), I have a PoemService interface

@RegisterAiService
public interface PoemService {

    @SystemMessage("You are a poet")
    @UserMessage("Write a poem of 5 lines about {topic}")
    public TokenStream poemTokenStream(String topic);

    @SystemMessage("You are a poet")
    @UserMessage("Write a poem of 5 lines about {topic}")
    public Multi<String> poemMulti(String topic);
}

used and exposed by the PoemResource class

@Path("/poem")
public class PoemResource {

    @Inject
    PoemService service;

    @GET
    @Path("/token_stream")
    @Produces(MediaType.TEXT_PLAIN)
    public Multi<String> tokenStream() {
        return Multi.createFrom().emitter(em -> {
            service.poemTokenStream("dog")
                .onNext(em::emit)
                .onComplete(s -> {
                    System.out.println(s);
                    em.complete();
                })
                .onError(em::fail)
                .start();
        });
    }

    @GET
    @Path("/multi_stream")
    @Produces(MediaType.TEXT_PLAIN)
    public Multi<String> multiStream() {
        return service.poemMulti("dog");
    }
}

In the first endpoint the LLM interface returns a TokenStream which I remapped with a Multi and the result is what I expect.

> curl http://localhost:8080/poem/token_stream
  A furry friend so loyal and true,
Tail wagging, eyes shining bright,
Bringing joy and comfort to you,
A companion through day and night,
A love that will never take flight.

In the second endpoint, the LLM interface returns a Multi and in this case, as you can see, the response is always a JSON, also using the same prompt.

> curl http://localhost:8080/poem/multi_stream
  {
"barks": "loud",
"fur": "soft",
"tail": "wagging",
"eyes": "brown",
"love": "unconditional"
}

I think this behavior is due to the return type, but I don't know where in the code I can check this.

@geoand :pray:

geoand commented 4 months ago

Nice!

Can you open a draft PR please? That will make it a lot easier to collaborate on what the remaining issues.

vietk commented 4 months ago

Hello, just my thoughts on this because I wanted to achieve the same: I've been stopped by the fact that when the AIService method is synchronous there's a bit of logic that I was not able to include before returning a multi from the token stream.

andreadimaio commented 4 months ago

Some of the business logic should be in the AiServiceStreamingResponseHandler class used by the async workflow, but I haven't tested it yet. What is definitely missing is the part related to the audit and moderation. Maybe these could be added in the onComplete handler.

wfrank2509 commented 3 months ago

Hey there ... the LLM-Streaming feature would be really cool as it could come in handy for a project and would greatly improve the usability and user experience! Is there some news about that? Thanks.

geoand commented 3 months ago

The preliminary work was done by @andreadimaio and will be available with our next release which is due in a few days

andreadimaio commented 3 months ago

@wfrank2509, you must use Multi<String> as the return type of a method, if you want to use the streaming functionality.

@RegisterAiService
public interface LLMService {

    @SystemMessage("...")
    @UserMessage("...")
    Multi<String> poem(...);
}
wfrank2509 commented 3 months ago

Sounds cool - Thank you!

andreas-eberle commented 3 months ago

Hey @andreadimaio,

thanks for the implementation. I had a look at the PR and I was wondering, does this only work for WatsonX? Or does it also work for OpenAI?

Thanks!

geoand commented 3 months ago

It should work for OpenAI as well

andreadimaio commented 3 months ago

Hey @andreadimaio,

thanks for the implementation. I had a look at the PR and I was wondering, does this only work for WatsonX? Or does it also work for OpenAI?

Thanks!

In general the Multi<String> should works for all the module that implement the StreamingChatModel interface.

andreas-eberle commented 3 months ago

How do I enable this for the OpenAI integration? I see that the extension has a OpenAiStreamingChatModel but it doesn't seem to work. In my code, I get the following error where it seems it cannot find a bean. Do you know what I might be doing wrong?

2024-03-28 17:35:51,681 ERROR [io.qua.dep.dev.IsolatedDevModeMain] (main) Failed to start quarkus: java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
    [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
    - synthetic injection point
    - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
    at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
    at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
    at java.base/java.lang.Thread.run(Thread.java:1583)
    at org.jboss.threads.JBossThread.run(JBossThread.java:483)
Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
    - synthetic injection point
    - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:519)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
    ... 12 more

    at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:334)
    at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:251)
    at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:60)
    at io.quarkus.deployment.dev.IsolatedDevModeMain.firstStart(IsolatedDevModeMain.java:112)
    at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:433)
    at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:55)
    at io.quarkus.bootstrap.app.CuratedApplication.runInCl(CuratedApplication.java:138)
    at io.quarkus.bootstrap.app.CuratedApplication.runInAugmentClassLoader(CuratedApplication.java:93)
    at io.quarkus.deployment.dev.DevModeMain.start(DevModeMain.java:131)
    at io.quarkus.deployment.dev.DevModeMain.main(DevModeMain.java:62)
Caused by: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
    [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
    - synthetic injection point
    - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
    at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
    at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
    at java.base/java.lang.Thread.run(Thread.java:1583)
    at org.jboss.threads.JBossThread.run(JBossThread.java:483)
Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
    - synthetic injection point
    - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:519)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
    ... 12 more

    at io.quarkus.builder.Execution.run(Execution.java:123)
    at io.quarkus.builder.BuildExecutionBuilder.execute(BuildExecutionBuilder.java:79)
    at io.quarkus.deployment.QuarkusAugmentor.run(QuarkusAugmentor.java:160)
    at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:330)
    ... 9 more
Caused by: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
    - synthetic injection point
    - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
    at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
    at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
    at java.base/java.lang.Thread.run(Thread.java:1583)
    at org.jboss.threads.JBossThread.run(JBossThread.java:483)
Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
    - synthetic injection point
    - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:519)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
    ... 12 more
geoand commented 3 months ago

Just doing:

    @RegisterAiService
    interface Assistant {

        Multi<String> chat(String message);
    }

works for me

andreadimaio commented 3 months ago

What version are you using? I tested it now and everything works (with OpenAI)

geoand commented 3 months ago

It should work with 0.10.z.

BTW, I will be releasing 0.10.2 in a few minutes (if nothing unexpected comes up)

andreas-eberle commented 3 months ago

Could it be because I use Kotlin? I'm using 0.10.1.

geoand commented 3 months ago

I doubt it, but with Kotlin, nothing surprises me...

If you attach a sample, I will take a look tomorrow

andreas-eberle commented 3 months ago

I found my issue... I accidentially still had the huggingface extension enabled in my playground project and that doesn't support the streaming... sorry for that :(

geoand commented 3 months ago

No problem :)