langchain-ai / langserve

LangServe 🦜️🏓
Other
1.95k stars 218 forks source link

Streaming not working as expected when Using Huggingface Pipelines in Langserve #316

Open jefffortune opened 11 months ago

jefffortune commented 11 months ago

I am using Langserve and Langchain with huggingface pipelines with a Streamer object.

If I use TextStreamer obj from huggingface, I can see the stream in stdout.

I read that I might need to use TextIteratorStreamer to make it work. With this one, I don't see any response in stdout, which is the expectation.

I do get response on client for both but only the dict

{'question': 'Please explain AI', 'text': ' Artificial Intelligence (AI) is a branch of computer science focused on creating intelligent machines or systems that can mimic human cognitive abilities, such as learning, problem-solving, and decision-making. It aims to develop advanced algorithms and technologies capable of performing tasks typically requiring human intelligence, like recognizing patterns in data, understanding natural language, making predictions, and more. The goal is to create smarter, adaptive systems that can improve various aspects of our lives, from automating complex processes to enhancing communication between humans and computers.'}

this is the server.py code

# Import the required packages and modules
import torch
import warnings

from fastapi import FastAPI

from langchain.chains import LLMChain
from langchain.llms.huggingface_pipeline import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langserve import add_routes
from langchain.schema.runnable import RunnablePassthrough, RunnableGenerator
from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer, 
    TextStreamer, 
    pipeline, 
    TextIteratorStreamer
)

# Set the device to GPU if available
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
# Ignore verbose warnings
warnings.filterwarnings("ignore")

# Model and Tokenizer setup
model_name = "Intel/neural-chat-7b-v3-3"
tokenizer = AutoTokenizer.from_pretrained(
    model_name,
    cache_dir=f"./model/{model_name}",
    use_fast=True
)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    cache_dir=f"./model/{model_name}",
    device_map="auto",
    trust_remote_code=False,
    torch_dtype=torch.bfloat16,
).eval()

# Streamer and pipeline setup
streamer = TextIteratorStreamer(
    tokenizer, 
    skip_prompt=True, 
    skip_special_tokens=True
)

text_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=1024,
    temperature=0.3,
    top_p=0.95,
    top_k=40,
    repetition_penalty=1.15,
    num_return_sequences=1,
    streamer=streamer,
    do_sample=True,
    pad_token_id=tokenizer.eos_token_id,
)

DEFAULT_SYSTEM_PROMPT = """"You are a helpful AI that like to talk like a pirate"""
def prompt_template(sys_prompt: str = DEFAULT_SYSTEM_PROMPT) -> str:
    """Template for the prompt to be used in the model.

    Args:
        sys_prompt (str, optional): System's prompt. Defaults to DEFAULT_SYSTEM_PROMPT.

    Returns:
        str: Prompt template.
    """
    context = "{question}"
    template = f"""### System:
{sys_prompt}
### User:
{context}
### Assistant:
"""
    return template

# LLM and Prompt setup
llm = HuggingFacePipeline(pipeline=text_pipeline)
template = prompt_template()
prompt = PromptTemplate(template=template, input_variables=["question"])
llm_chain = LLMChain(prompt=prompt, llm=llm)

# Chain Configuration
qa_chain = llm_chain | RunnablePassthrough()

# FastAPI App definition
app = FastAPI(
    title="LangChain Server",
    version="1.0",
    description="A simple API server using LangChain's Runnable interfaces",
)

# Adding chain route
add_routes(
    app,
    qa_chain,
    path="/qa_chain",
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

this is client.py

from langserve import RemoteRunnable
from langchain.llms import TextGen
chain = RemoteRunnable("http://localhost:8080/qa_chain/")

for chunk in chain.stream({"question": "Please explain AI"}, stop=["'", "\n"]):
    print(chunk, end="", flush=True)
eyurtsev commented 11 months ago

@jefffortune Could you help me determine if this is a langserve or langchain issue (i.e., issue with the HuggingFacePipeline?)?

Does qa_chain stream on its own? (without trying to stream it via langserve)

# Chain Configuration
qa_chain = llm_chain | RunnablePassthrough()

for chunk in qa_chain.stream({"question": "Please explain AI"}, stop=["'", "\n"]):
    print(chunk, end="", flush=True)
jefffortune commented 11 months ago

when using

streamer = TextIteratorStreamer(
    tokenizer, 
    skip_prompt=True, 
    skip_special_tokens=True
)

I don't see any streaming at all, no matter the code updates that follow. When I switched to following and applying code updates, I could see only server-side streaming in logs.

streamer = TextStreamer(
    tokenizer, 
    skip_prompt=True, 
    skip_special_tokens=True
)

When adding the following code to server.py

for chunk in qa_chain.stream({"question": "Please explain AI"}, stop=["'", "\n"]):
    print(chunk, end="", flush=True)

I got an error because stop is not a recognized property. so I updated to this

for chunk in qa_chain.stream({"question": "Please explain AI"}):
    print(chunk, end="", flush=True)

I could see it streaming successfully in the server logs.

I updated the client.py with that working code from the server test, but the client is still not streaming. I can see it streaming in the server logs but the output of client is a dictionary.

eyurtsev commented 11 months ago

Different question -- Is the underlying transformer able to handle concurrent usage? i.e., is there any queue that can make sure that hardware resources like the GPU are used correctly?


Let's try two things:

1) Simplify the chain as much as possible: remove RunnablePassthrough, and replace LLMChain with prompt | llm

I don't think this will help, but worth double checking. In case there's something weird happening inside LLMChain.


# LLM and Prompt setup
llm = HuggingFacePipeline(pipeline=text_pipeline)
template = prompt_template()
prompt = PromptTemplate(template=template, input_variables=["question"])

# Chain Configuration
qa_chain = prompt | llm

2) >> I could see it streaming successfully in the server logs.

Could you elaborate what you mean by server logs?

What I'm wondering is if you could test qa_chain without langserve / fast api at all (i.e,. there should be no server logs to look at); e.g., from a jupyter notebook. And confirm stream returns chunks one at a time.

If the .stream() works for this chain in jupyter, then it should work when exposed via langserve.

jefffortune commented 11 months ago

@eyurtsev, thanks for the advice on code cleanup. This simplification does not return dict anymore but the generated text on the client.py. I will provide the updated code to show the current state and further detail about seeing the streaming output in the server console log.

server.py

# Import the required packages and modules
import torch
import warnings

from fastapi import FastAPI

from langchain.llms.huggingface_pipeline import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langserve import add_routes
from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer, 
    TextStreamer, 
    pipeline, 
    TextIteratorStreamer
)

# Ignore verbose warnings
warnings.filterwarnings("ignore")

# Model and Tokenizer setup
model_name = "Intel/neural-chat-7b-v3-3"
tokenizer = AutoTokenizer.from_pretrained(
    model_name,
    cache_dir=f"./model/{model_name}",
    use_fast=True
)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    cache_dir=f"./model/{model_name}",
    device_map="auto",
    trust_remote_code=False,
    torch_dtype=torch.bfloat16,
).eval()

# Streamer and pipeline setup
streamer = TextIteratorStreamer(
    tokenizer, 
    skip_prompt=True, 
    skip_special_tokens=True
)

text_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=1024,
    temperature=0.3,
    top_p=0.95,
    top_k=40,
    repetition_penalty=1.15,
    num_return_sequences=1,
    streamer=streamer,
    do_sample=True,
    pad_token_id=tokenizer.eos_token_id,
)

DEFAULT_SYSTEM_PROMPT = """"You are helpful AI."""
def prompt_template(sys_prompt: str = DEFAULT_SYSTEM_PROMPT) -> str:
    """Template for the prompt to be used in the model.

    Args:
        sys_prompt (str, optional): System's prompt. Defaults to DEFAULT_SYSTEM_PROMPT.

    Returns:
        str: Prompt template.
    """
    context = "{question}"
    template = f"""### System:
{sys_prompt}
### User:
{context}
### Assistant:
"""
    return template

# LLM and Prompt setup
llm = HuggingFacePipeline(pipeline=text_pipeline)
template = prompt_template()
prompt = PromptTemplate(template=template, input_variables=["question"])

# Chain Configuration
qa_chain = prompt | llm

for chunk in qa_chain.stream({"question": "Please explain AI"}):
    print(chunk, end="", flush=True)

# FastAPI App definition
app = FastAPI(
    title="LangChain Server",
    version="1.0",
    description="A simple API server using LangChain's Runnable interfaces",
)

# Adding chain route
add_routes(
    app,
    qa_chain,
    path="/qa_chain",
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

client.py

from langserve import RemoteRunnable

chain = RemoteRunnable("http://localhost:8080/qa_chain/")
payload = {
    "question": "Tell a joke about troubleshooting."
}

for chunk in chain.stream(payload):
    print(chunk, end="[END]", flush=True)

I want to note this code using the TextIteratorStreamer, and this is an image of the output log. image

When I update the streamer class to the following code, you can see it stream to the console.

# Streamer and pipeline setup
streamer = TextStreamer(
    tokenizer, 
    skip_prompt=True, 
    skip_special_tokens=True
)

This image shows the aftermath of it streaming to the server console. image

eyurtsev commented 11 months ago

@jefffortune

Thank you for attaching the screenshots!

Could you test this out:

for chunk in qa_chain.stream({"question": "Please explain AI"}):
    print(chunk)
    print('--')

You should be seeing -- interleaving every few characters if streaming is actually working.

eyurtsev commented 11 months ago

@jefffortune Different and more important question --

Is the underlying transformer able to handle concurrent usage? i.e., is there any queue that can make sure that hardware resources like the GPU are used correctly?

If it doesn't handle concurrent usage correctly, it won't work if deployed via a server (regardless of whether it's used via invoke or stream)

sukidesuka commented 8 months ago

@jefffortune

Thank you for attaching the screenshots!谢谢您附上的截图!

Could you test this out:你能测试一下这个吗?

for chunk in qa_chain.stream({"question": "Please explain AI"}):
    print(chunk)
    print('--')

You should be seeing -- interleaving every few characters if streaming is actually working.如果流媒体实际上正在工作,您应该看到每隔几个字符就有 -- 交错。

After trying, I found that it only outputs "--" on the last line, but before outputting "--", the LLM does indeed stream to the console.

sukidesuka commented 8 months ago

I think a callback or something else is needed to allow functions like chain.invoke or chain.astream to capture console output, or to use TextIteratorStreamer to capture the output by themselves, and then transmit it to LangServe. Alternatively, add something in the middle of the chain to allow the entire chain.astream to output in chunks.

vishal-ajmera commented 5 months ago

@jefffortune

I am also facing this issue. Did you got it resolved? Can you please post your analysis?