langchain-ai / langserve

LangServe 🦜️🏓
Other
1.76k stars 186 forks source link

`stream` API returns entire answer after a while, instead of actually streaming the answer for `ConversationalRetrievalChain` #218

Open tigerinus opened 7 months ago

tigerinus commented 7 months ago

Hello,

I built a simple langchain app using ConversationalRetrievalChain and langserve.

It is working great for its invoke API. However when it comes to stream API, it returns entire answer after a while, instead of actually streaming the answer.

Here is the langserve part:

    streamer = TextStreamer(tokenizer=tokenizer, skip_prompt=True)

    pipeline = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_length=1000,
        temperature=0.1,
        top_p=0.95,
        top_k=40,
        repetition_penalty=1.1,
        do_sample=True,
        streamer=streamer,
        # device=0,  # -1 for CPU
    )

    tokenizer.pad_token_id = model.config.eos_token_id

    llm = HuggingFacePipeline(
        pipeline=pipeline,
    )

    condenseQuestionPrompt = PromptTemplate.from_template(CONDENSE_QUESTION_TEMPLATE)

    prompt = PromptTemplate(template=TEMPLATE, input_variables=["context", "question"])
    chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        verbose=True,
        condense_question_prompt=condenseQuestionPrompt,
        combine_docs_chain_kwargs={"prompt": prompt},
        response_if_no_docs_found="I'm afraid that I don't have an answer to the question.",
    ).with_types(input_type=ChatHistory)

    app = FastAPI(
        title="app",
        version="0.1",
        description="The AI which knows everything about XYZ products.",
    )

    app.add_middleware(
        GZipMiddleware,
        minimum_size=1024,
    )

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_methods=["*"],
        allow_headers=["*"],
    )

    add_routes(app, chain)

    uvicorn.run(app, host="0.0.0.0", port=8000)

Please help. Thanks!

eyurtsev commented 7 months ago

@tigerinus could you confirm that this is a langserve issue rather than langchain issue? i..e,

   chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        verbose=True,
        condense_question_prompt=condenseQuestionPrompt,
        combine_docs_chain_kwargs={"prompt": prompt},
        response_if_no_docs_found="I'm afraid that I don't have an answer to the question.",
    ).with_types(input_type=ChatHistory)

for chunk in chain.stream(..): # <-- Does this work
  ...
tigerinus commented 7 months ago

@tigerinus could you confirm that this is a langserve issue rather than langchain issue? i..e,

   chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        verbose=True,
        condense_question_prompt=condenseQuestionPrompt,
        combine_docs_chain_kwargs={"prompt": prompt},
        response_if_no_docs_found="I'm afraid that I don't have an answer to the question.",
    ).with_types(input_type=ChatHistory)

for chunk in chain.stream(..): # <-- Does this work
  ...

Yes, it works. It prints the answer token by token.

    for chunk in chain.stream({"question": "What's capital city of Canada?", "chat_history": []}):
        print(chunk)
eyurtsev commented 7 months ago

Could you check what happens if you remove the middleware for gzip compression?

tigerinus commented 7 months ago

Could you check what happens if you remove the middleware for gzip compression?

That didn't help.

How exactly should I call the /stream endpoint?

Does it work elsewhere?

eyurtsev commented 7 months ago

Using the RemoteClient. The RemoteClient should be pointing at the path of the runnable (not the /stream endpoint)

See example here in the middle that streams: https://github.com/langchain-ai/langserve/blob/main/examples/llm/client.ipynb

The server should stream if the underlying runnable used by the server can stream. You can also try astream_log from the RemoteClient

tigerinus commented 7 months ago

I will try that notebook.

However our frontend is written in VueJS. Would be nice to have an example code in JS.

If we call /stream directly we see the answer returned in whole chunk

image

tigerinus commented 7 months ago

@eyurtsev I have no idea how to get the notebook example working for my app:

I tried

from langchain.prompts.chat import ChatPromptTemplate

from langserve import RemoteRunnable
llm = RemoteRunnable('http://localhost:8000')

prompt = ChatPromptTemplate.from_messages({
    "input": {
        "question": "Hi",
        "chat_history": []
    }
}).format_messages()

llm.invoke(prompt)

But the backend always say something like:

INFO:     127.0.0.1:52946 - "POST /invoke HTTP/1.1" 500 Internal Server Error                                                                                                                               
ERROR:    Exception in ASGI application                                                                                                                                                                     
Traceback (most recent call last):                                                                                                                                                                          
  File "/home/tiger/dev/IceWhaleTech/IceWhale-101-GPT/llm_app_langserve/venv/lib/python3.11/site-packages/pydantic/v1/main.py", line 716, in validate                                                       
    value_as_dict = dict(value)                                                                                                                                                                             
                    ^^^^^^^^^^^                                                                                                                                                                             
ValueError: dictionary update sequence element #0 has length 4; 2 is required                                                                                                                               
eyurtsev commented 7 months ago

for js: Have you tried the js remote runnable client? https://api.js.langchain.com/classes/runnables_remote.RemoteRunnable.html

absent any bugs, it should be a drop in replacement -- so you could launch the server code provided in the example, make confirm the python client code works, and then replace with js version

For your code snippet could you include both server and client code?

In the example I referenced the remote runnables take a path to the mounted location of the runnable:

openai_llm = RemoteRunnable("http://localhost:8000/openai/")
anthropic = RemoteRunnable("http://localhost:8000/anthropic/")
gabegaz commented 7 months ago

I also encountered the same issue

eyurtsev commented 7 months ago

@gabegaz do you have a snippet of both client and server code you're using? If i have full code, I can try to recreate locally

gabegaz commented 7 months ago

Here is my server side code snippet:

`from langchain.llms import LlamaCpp from langchain.callbacks.manager import CallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from fastapi import FastAPI from langserve import add_routes

n_gpu_layers = 1
n_batch = 512
callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])

llm = LlamaCpp( model_path=mistral+"mistral-7b-instruct-v0.1.Q5_K_M.gguf", n_gpu_layers=n_gpu_layers, n_batch=n_batch, n_ctx=2048, f16_kv=True,
callback_manager=callback_manager, verbose=True, )

app = FastAPI( title="LangChain Server", version="1.0", description="A simple api server using Langchain's Runnable interfaces", )

add_routes( app, llm, path="/datalem", )

if name == "main": import uvicorn uvicorn.run(app, host="localhost", port=8000)`

And here is the client code:

`from langchain.prompts.chat import ChatPromptTemplate from langserve import RemoteRunnable from .models import Chat from django.http import JsonResponse from django.utils import timezone from django.shortcuts import render

datalem = RemoteRunnable("http://localhost:5000/datalem/")

def chatbot_langserve(request): chats = Chat.objects.filter(user=request.user.id)

if request.method == 'POST':
    message = request.POST.get('message')

    prompt = ChatPromptTemplate.from_messages(
         [
              (
                   "system",
                   "You are a highly educated person who loves to use big words. "
                   + "You are also concise. Never answer in more than three sentences.",                       ),
                   ("human", message),
                   ]).format_messages()

    # response =datalem.invoke(prompt)
    for chunk in datalem.stream(prompt):
        response=(chunk.content, end="", flush=True)

    chat = Chat(user=request.user, message=message,
                response=response, created_at=timezone.now)
    chat.save()
    return JsonResponse({'message': message, 'response': response})
return render(request, 'chatbot.html', {'chats': chats})

My problem is on the client side. How can I get response from this print() function? # response =datalem.invoke(prompt) for chunk in datalem.stream(prompt): print(chunk.content, end="", flush=True)

eyurtsev commented 7 months ago

@tigerinus , @gabegaz Thanks for the code. The underlying chain (ConversationalRetrievalChain) does not support streaming, so this is not a langserve issue.

TLDR:

Use this for conversational retriever: https://python.langchain.com/docs/expression_language/cookbook/retrieval#conversational-retrieval-chain

How to debug whether streaming is supported

The best way to debug is use the server chain (without any server in the middle) and confirm whether it streams or not.

In this case, you can test in the following manner:


from langchain.chains import ConversationalRetrievalChain
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from langchain.pydantic_v1 import BaseModel, Field
from langchain.vectorstores import FAISS

vectorstore = FAISS.from_texts(
    ["cats like fish", "dogs like sticks"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
model = ChatOpenAI()
chain = ConversationalRetrievalChain.from_llm(model, retriever)

# User input
class ChatHistory(BaseModel):
    """Chat history with the bot."""

    chat_history: List[Tuple[str, str]] = Field(
        ...,
        extra={"widget": {"type": "chat", "input": "question", "output": "answer"}},
    )
    question: str

chain = ConversationalRetrievalChain.from_llm(model, retriever).with_types(
    input_type=ChatHistory
)

async for chunk in chain.astream(
    {"question": "tell me along story about bears", "chat_history": [("hi", "hi")]}
):
    print(chunk)

You'll see that the underlying chain only has one chunk! Sot it definitely does not stream.

If you create it using LCEL, confirm first that streaming works server side and then hook it into langserve

eyurtsev commented 7 months ago

Updated the example: https://github.com/langchain-ai/langserve/pull/238

See this code: https://github.com/langchain-ai/langserve/blob/main/examples/conversational_retrieval_chain/server.py and corresponding client: https://github.com/langchain-ai/langserve/blob/main/examples/conversational_retrieval_chain/client.ipynb

tigerinus commented 7 months ago

It is by design or a bug that ConversationalRetrievalChain doesn't support streaming?

eyurtsev commented 7 months ago

Neither/both?

ConversationalRetrievalChain is an old style langchain chain that doesn't fully utilize LCEL.

A lot of the older objects do not support all the capabilities that LCEL offers (e.g., native async, optimized batching, stream, stream log etc.)

We're not deprecating the old style objects since users depend on them, but we're also not enhancing them in many cases since it's hard and also doesn't really make sense since LCEL a lot more flexibility to customize the behavior properly for the given application domain.

rickknowles-cognitant commented 6 months ago

Thanks @eyurtsev - the previous reply was super helpful to understand the nature of the problem I've been seeing (which is pretty similar to the OP's problem).

My question is then: is there something we can do as a workaround ? It appears that Chainlit has done something async using a callback handler that is effectively granting them the ability to do it.

image

(image taken from https://docs.chainlit.io/api-reference/integrations/langchain )

The source for the handler appears here: https://github.com/Chainlit/chainlit/blob/main/backend/chainlit/langchain/callbacks.py

it's then invoked in chainlit message calls as:

image

From what I can see in the source (although I'm not sure if I'm reading it right) they are using a combination of a Tracer subclass and consuming the entire thing to pull out the element they want ?

Is this an advisable approach ? Or is there something else you'd recommend to people who wanted to get to this sort of functionality in a langserve environment ?

Thanks in advance,

Rick

mhadi4194 commented 4 months ago
> My problem is on the client side. How can I get response from this print() function? ` # response =datalem.invoke(prompt) for chunk in datalem.stream(prompt): print(chunk.content, end="", flush=True)`

@gabegaz As you mentioned, the problem seems to be on client side. As client receives the tokens, it is supposed to render the html or update the user interface (which is typically done in other languages such as js).

see this example that I got from googling: "how to render streaming response in python on client side?"

instead of this:

if request.method == 'POST':
    message = request.POST.get('message')

    prompt = ChatPromptTemplate.from_messages(
         [
              (
                   "system",
                   "You are a highly educated person who loves to use big words. "
                   + "You are also concise. Never answer in more than three sentences.",                       ),
                   ("human", message),
                   ]).format_messages()

    # response =datalem.invoke(prompt)
    for chunk in datalem.stream(prompt):
        response=(chunk.content, end="", flush=True)

    chat = Chat(user=request.user, message=message,
                response=response, created_at=timezone.now)
    chat.save()
    return JsonResponse({'message': message, 'response': response})
return render(request, 'chatbot.html', {'chats': chats})

execute something like this code on client side:

from django.http import HttpResponse
datalem = RemoteRunnable("http://localhost:5000/datalem/")

def chatbot_langserve(request):

    # removing save chat to keep it simple
#     chats = Chat.objects.filter(user=request.user.id)

    if request.method == 'POST':
        message = request.POST.get('message')

#        message = input()  # typically client sends requests to server. I'm not sure about your set up, it appears that "client" is
processing post request instead of receiving user input form UI /console ?

        prompt = ChatPromptTemplate.from_messages(
             [
                  (
                       "system",
                       "You are a highly educated person who loves to use big words. "
                       + "You are also concise. Never answer in more than three sentences.",                       ),
                       ("human", message),
                       ]).format_messages()

        def generate():
            for chunk in datalem.stream(prompt):
                yield chunk.content

    return HttpResponse(generate(), content_type='text/plain')