langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
89.3k stars 14.08k forks source link

Issue: How to return a stream in api #6512

Closed dinhan92 closed 4 months ago

dinhan92 commented 1 year ago

Issue you'd like to raise.

So, I've tried to create a custom callback and try to return a stream but got no luck:

class MyCallbackHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token, **kwargs) -> None:
        # print every token on a new line
        yield token

llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo-0301", openai_api_key="openai_api_key", streaming=True, callbacks=[MyCallbackHandler()])

@app.route('/api/chatbot', methods=['GET', 'POST'])
@token_required
def chatbot(**kwargs) -> str:
    # rest of code
    tools = toolkit.get_tools()
    agent_chain = initialize_agent(tools=tools, llm=llm, agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION, 
                                   memory=memory, verbose=True)
    response = agent_chain.run(input=input_text)

    return app.response_class(response)

Suggestion:

I am thinking of return the on_llm_new_token as a stream from the custom callback, but I get no idea of doing that. How to return a stream, pls give me a solution!

devstein commented 1 year ago

Generated by a 🤖

Answer

To return a stream in your API, you can use Flask's Response class with a generator function. Modify your MyCallbackHandler class to store tokens, create a generator function to yield tokens, and modify your API route to return a streamed response. Here's how you can do it:

  1. Modify the MyCallbackHandler class to store tokens:
class MyCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.tokens = []

    def on_llm_new_token(self, token, **kwargs) -> None:
        self.tokens.append(token)
  1. Create a generator function to yield tokens:
def token_generator(callback_handler):
    for token in callback_handler.tokens:
        yield token
  1. Modify your API route to return a streamed response:
@app.route('/api/chatbot', methods=['GET', 'POST'])
@token_required
def chatbot(**kwargs) -> str:
    # rest of code
    tools = toolkit.get_tools()
    my_callback_handler = MyCallbackHandler()
    agent_chain = initialize_agent(tools=tools, llm=llm, agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
                                   memory=memory, verbose=True, callbacks=[my_callback_handler])
    response = agent_chain.run(input=input_text)

    return app.response_class(token_generator(my_callback_handler), mimetype='text/plain')

This will return a streamed response with each token on a new line. Note that the mimetype is set to 'text/plain' for simplicity, but you can change it to suit your needs

Sources


This response is meant to be helpful and give you a place to start. It is not meant to be a precise solution.

Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

dinhan92 commented 1 year ago

@devstein after all tokens is generated, the callback_handler.tokens still has len = 0. Otherwise, this is still not I want, I can achieve this by @stream_with_context which is when I get all the answer, then I make the answer (which is already a string) a stream. However what I want to achieve is whenever there is a token, I get that token and because of that, that will be a stream. I think, in llama index, the response_gen of as_query_engine works like that. However, I d like to make it a chat, so I am trying to make this agent work like response_gen.

image

devstein commented 1 year ago

Hey @dinhan92 the previous response was generated by my agent 🤖 , but it looks directionally correct! Thanks for the reference to llama index behavior

I'm not positive, but believe the answer is to use the async arun and run the async task in separate thread and return the generate that yields each token as they arrive. You'll also want to make sure that streaming=True for the underlying LLM call.

This is also how Llama Index implements their response_gen functionality. First they create a custom streaming callback like my agent commented above, then they start a new thread for the LLM streaming call.

Hope this helps!

dmyoung9 commented 11 months ago

I believe this would also be fixed by #8991.

diegoquezadac commented 8 months ago

I'm having a similar issue. Although I managed to stream a response using aa similar approach, when receiving concurrent API calls the responses get mixed up because both of them are using the same callback's list of tokens.

Any idea about how to save the tokens of different answers in different lists inside the callback considering, for example, a message_id ?

nsiddarthreddy commented 8 months ago

@diegoquezadac Do did you manage to get stream response?

async def send_message(user_query: str) -> AsyncIterable[str]:
    callback = AsyncIteratorCallbackHandler()

    agent = create_csv_agent(
        ChatOpenAI(
            temperature=0,
            model="gpt-4",
            openai_api_key=settings.openai_normandy_key,
            streaming=True,
            callbacks=[callback],
        ),
        local_csv_file
        agent_type=AgentType.OPENAI_FUNCTIONS,
        handle_parsing_errors=True,
    )

    response_schemas = [
        ResponseSchema(
            name="answer_explaintion",
            description="Explain how get got the answer.",
        ),
        ResponseSchema(
            name="answer",
            description="Execute the python code and return the results in json format.",
        ),
        ResponseSchema(
            name="question_answered",
            description="boolean field, set to true only if question was answered with very high confidence",
            type="boolean",
        ),
    ]
    output_parser = StructuredOutputParser.from_response_schemas(response_schemas)
    user_query = (
        + "Here is the user question"
        + user_query
    )

    async def wrap_done(fn: Awaitable, event: asyncio.Event):
        """Wrap an awaitable with a event to signal when it's done or an exception is raised."""
        try:
            await fn
        except Exception as e:
            # TODO: handle exception
            print(f"Caught exception: {e}")
        finally:
            # Signal the aiter to stop.
            event.set()

    # Begin a task that runs in the background.
    run = asyncio.create_task(agent.arun(input=user_query))

    async for token in callback.aiter():
        yield token

    await run`
zhangleinice commented 8 months ago

@ nsiddarthreddy

openai.proxy = {
            "http": "http://127.0.0.1:7890",
            "https": "http://127.0.0.1:7890"
        }

callback = AsyncIteratorCallbackHandler()

llm = OpenAI(
    openai_api_key= os.environ["OPENAI_API_KEY"],
    temperature=0, 
    streaming=True, 
    callbacks=[callback]
)
embeddings = OpenAIEmbeddings()

# faq
loader = TextLoader("static/faq/ecommerce_faq.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)

docsearch = Chroma.from_documents(texts, embeddings)

faq_chain = RetrievalQA.from_chain_type(
    llm, 
    chain_type="stuff", 
    retriever=docsearch.as_retriever(),
)

@tool("FAQ")
def faq(input) -> str:
    """"useful for when you need to answer questions about shopping policies, like return policy, shipping policy, etc."""
    print('faq input', input)
    return faq_chain.acall(input)

tools = [faq]

memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)

conversation_agent = initialize_agent(
    tools, 
    llm, 
    agent="conversational-react-description", 
    memory=memory, 
    verbose=True,
)

 async def wait_done(fn, event):
    try:
        await fn
    except Exception as e:
        print('error', e)
        # event.set()
    finally:
        event.set()

async def call_openai(question):

    # chain = faq(question)
    chain = conversation_agent.acall(question)

    coroutine = wait_done(chain, callback.done)

    task = asyncio.create_task(coroutine)

    async for token in callback.aiter():
        # print('token', token)
        yield f"{token}"

    await task

app = FastAPI()

@app.get("/")
async def homepage():
    return FileResponse('static/index.html')

@app.post("/ask")
def ask(body: dict):
    return StreamingResponse(call_openai(body['question']), media_type="text/event-stream")

if __name__ == "__main__":
   uvicorn.run(host="127.0.0.1", port=8888, app=app)

After I use agent.arun, I get this error: error faq() takes 1 positional argument but 2 were given。 There is no problem when using faq_chain.acall(input) directly

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Thought: Do I need to use a tool? No AI: 你好!很高兴认识你!

Finished chain. INFO: 127.0.0.1:65193 - "POST /ask HTTP/1.1" 200 OK

Entering new AgentExecutor chain...

Thought: Do I need to use a tool? Yes Action: FAQ Action Input: 如何更改帐户信息error faq() takes 1 positional argument but 2 were given

Suggestion: No response

dosubot[bot] commented 5 months ago

Hi, @dinhan92,

I'm helping the LangChain team manage their backlog and am marking this issue as stale. The issue you opened is about returning a stream from a custom callback in an API. There have been discussions and suggestions from other users, such as using Flask's Response class with a generator function and managing tokens for concurrent API calls. However, the issue remains unresolved.

Could you please confirm if this issue is still relevant to the latest version of the LangChain repository? If it is, please let the LangChain team know by commenting on the issue. Otherwise, feel free to close the issue yourself, or it will be automatically closed in 7 days.

Thank you for your understanding and cooperation. If you have any further questions or updates, feel free to reach out.