langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
94.07k stars 15.18k forks source link

Title: Request for Streaming Outputs as a Generator for Dynamic Chat Responses #2428

Closed tugot17 closed 1 year ago

tugot17 commented 1 year ago

Issue Description:

I'm looking for a way to obtain streaming outputs from the model as a generator, which would enable dynamic chat responses in a front-end application. While this functionality is available in the OpenAI API, I couldn't find a similar option in Langchain.

I'm aware that using verbose=True allows us to see the streamed results printed in the console, but I'm specifically interested in a solution that can be integrated into a front-end app, rather than just displaying the results in the console.

Is there any existing functionality or workaround to achieve this, or could this feature be considered for future implementation?

The way I imagine this is something resembling this:

from langchain.callbacks import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
    AIMessage,
    HumanMessage,
    SystemMessage
)

chat = ChatOpenAI(streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), verbose=False, temperature=0)

messages = [
    SystemMessage(content="You are a helpful assistant that translates English to French."),
    HumanMessage(content="Translate this sentence from English to French. I love programming. I really really really  really love it")
]

for chunk in chat(messages):
    yield chunk.text
Jeru2023 commented 1 year ago

Take a reference from chat-langchain source code.

moraneden commented 1 year ago

@Jeru2023 , I am looking for an example with SSE instead of websocket, can you help?

LawlightXY commented 1 year ago

@tugot17 is there any ideas now? i have the same issue。

Jeru2023 commented 1 year ago

@tugot17 is there any ideas now? i have the same issue。

https://github.com/hwchase17/chat-langchain/blob/master/callback.py - line 16 https://github.com/hwchase17/chat-langchain/blob/master/main.py - line 39

tugot17 commented 1 year ago

@Jeru2023

This is using a websocets, which are not so easy to work with. We are looking for some example showing how we can just get the reply from chain as a generator.

If you want to use the model output as a streamed chat in Gradio it is not possible as far as I understand?

jianhuihi commented 1 year ago

how can i use ConversationChain with stream responses?

class EnqueueCallbackHandler(AsyncCallbackHandler):
    def __init__(self, queue: asyncio.Queue):
        self.queue = queue
        self.token_count = 0

    @property
    def always_verbose(self) -> bool:
        return True

    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        for c in token:
            print(c)
            print("==================================")
            await self.queue.put(c)
        self.token_count += 1

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        await self.queue.put(None)

    async def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        await self.queue.put(None)

class ChatView(APIView):
    authentication_classes = [JWTAuthentication]
    permission_classes = [IsAuthenticated,]
    async def process_request(self, request):
        message_id = request.data.get('message_id')
        userprompt = request.data.get('prompt')
        queue = asyncio.Queue()
        enqueue_callback_handler = EnqueueCallbackHandler(queue)
        callback_manager = AsyncCallbackManager([enqueue_callback_handler])
        llm = ChatOpenAI(streaming=True, verbose=True, temperature=0.7, callback_manager=callback_manager)
        message_history = RedisChatMessageHistory(message_id, url=CHAT_REDIS_URL, ttl=600)
        tokens_num = llm.get_num_tokens_from_messages(message_history.messages)
        if tokens_num > 1000:
            pass

        prompt = ChatPromptTemplate.from_messages([
            SystemMessagePromptTemplate.from_template("The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know."),
            MessagesPlaceholder(variable_name="message_history"),
            HumanMessagePromptTemplate.from_template("{input}")
        ])

        memory = ConversationBufferMemory(return_messages=True, memory_key="message_history", chat_memory=message_history)
        print("memory", memory)
        chain = ConversationChain(memory=memory, prompt=prompt, llm=llm, verbose=True)
        await chain.arun(input=userprompt)

        # how can i use ConversationChain with stream responses?
        def generate_data():
             while True:
                token =  queue.get()
                print(token)
                if token is None:
                    break
                yield token
             for query in [1, 2, 3, 4, 5]:
                print(query)
                yield query
        return StreamingHttpResponse(generate_data(), content_type='text/event-stream')

    def post(self, request):
        response = asyncio.run(self.process_request(request))
        return response
jianhuihi commented 1 year ago

Websockets can work!

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()

    async def disconnect(self, close_code):
        pass

    async def receive(self, text_data):
        # 从WebSocket接收到的数据
        data = json.loads(text_data)
        userprompt = data['prompt']
        message_id = data['message_id']
        callback_handler = AsyncIteratorCallbackHandler()
        callback_manager = AsyncCallbackManager([callback_handler])
        llm = ChatOpenAI(streaming=True, verbose=True, temperature=0.7, callback_manager=callback_manager)
        message_history = RedisChatMessageHistory(message_id, url=CHAT_REDIS_URL, ttl=600)
        tokens_num = llm.get_num_tokens_from_messages(message_history.messages)
        if tokens_num > 1000:
            pass

        prompt = ChatPromptTemplate.from_messages([
            SystemMessagePromptTemplate.from_template("The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know."),
            MessagesPlaceholder(variable_name="message_history"),
            HumanMessagePromptTemplate.from_template("{input}")
        ])
        memory = ConversationBufferMemory(return_messages=True, memory_key="message_history", chat_memory=message_history)
        chain = ConversationChain(memory=memory, prompt=prompt, llm=llm, verbose=False)
        run = asyncio.create_task(chain.arun(input=userprompt))

        # 在这里设置您的异步生成器
        async for token in callback_handler.aiter():
            await self.send(text_data=token)            
        await run

It's too difficult to use django StreamingHttpResponse

jrhe commented 1 year ago

@tugot17 You can implement this by running the LLM call in a thread and using a queue. You need to write a custom callback handler which takes the queue in its constructor, and pushes the token to the queue on the token callback. In the main method, you can iterate over the queue and then yield the results.

tugot17 commented 1 year ago

@jrhe Do you happen to have an example?

jrhe commented 1 year ago

@tugot17 This can definitely be improved, but will do what you want:

from queue import Queue
from typing import Any

import gradio as gr
from anyio.from_thread import start_blocking_portal
from langchain.callbacks.base import AsyncCallbackHandler, AsyncCallbackManager
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

class StreamingLLMCallbackHandler(AsyncCallbackHandler):
    """Callback handler for streaming LLM responses to a queue."""

    def __init__(self, q):
        self.q = q

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.q.put(token)

def bot(chatbot):
    user_msg = chatbot[-1][0]

    prompt = user_msg
    q = Queue()
    job_done = object()

    async def task(prompt):
        llm = ChatOpenAI(
            verbose=True,
            temperature=0,
            streaming=True,
            callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]),
        )

        ret = await llm.agenerate([[HumanMessage(content=prompt)]])
        q.put(job_done)
        return ret

    with start_blocking_portal() as portal:
        portal.start_task_soon(task, prompt)

        content = ""
        while True:
            next_token = q.get(True, timeout=1)
            if next_token is job_done:
                break
            content += next_token
            chatbot[-1][1] = content

            yield chatbot

with gr.Blocks() as demo:
    chatbot = gr.Chatbot()
    msg = gr.Textbox(label="Prompt")

    def chat(user_message, history):
        return "", history + [[user_message, None]]

    msg.submit(chat, [msg, chatbot], [msg, chatbot], queue=False).then(
        bot, chatbot, chatbot
    )

demo.queue()
demo.launch()
coffin5257 commented 1 year ago

The default AsyncIteratorCallbackHandler has a bug. In the example by @jianhuihi , during the first chat, on_llm_end is triggered before self.queue is empty, causing an error when other.pop().cancel() is called. After resolving this issue with if other.pop():, during the second chat, the while not self.queue.empty() or not self.done.is_set() loop in aiter will exit before on_llm_start (especially when executed in chain), causing aiter to end immediately and get empty messages.

Maybe we can self.done.clear() before break the while loop, or use an user controlled queue as @jrhe provided

tron19920125 commented 1 year ago

@tugot17这绝对可以改进,但会做你想做的:

from queue import Queue
from typing import Any

import gradio as gr
from anyio.from_thread import start_blocking_portal
from langchain.callbacks.base import AsyncCallbackHandler, AsyncCallbackManager
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

class StreamingLLMCallbackHandler(AsyncCallbackHandler):
    """Callback handler for streaming LLM responses to a queue."""

    def __init__(self, q):
        self.q = q

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.q.put(token)

def bot(chatbot):
    user_msg = chatbot[-1][0]

    prompt = user_msg
    q = Queue()
    job_done = object()

    async def task(prompt):
        llm = ChatOpenAI(
            verbose=True,
            temperature=0,
            streaming=True,
            callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]),
        )

        ret = await llm.agenerate([[HumanMessage(content=prompt)]])
        q.put(job_done)
        return ret

    with start_blocking_portal() as portal:
        portal.start_task_soon(task, prompt)

        content = ""
        while True:
            next_token = q.get(True, timeout=1)
            if next_token is job_done:
                break
            content += next_token
            chatbot[-1][1] = content

            yield chatbot

with gr.Blocks() as demo:
    chatbot = gr.Chatbot()
    msg = gr.Textbox(label="Prompt")

    def chat(user_message, history):
        return "", history + [[user_message, None]]

    msg.submit(chat, [msg, chatbot], [msg, chatbot], queue=False).then(
        bot, chatbot, chatbot
    )

demo.queue()
demo.launch()

@jrhe hi,When I modified from your code, an error is popping up. Could you point out the problem for me?

async def task(prompt):
      template = """XXXXXXX
      Q:{question} 
      A:
      """
      template_sum = """XXXXXXX
      Q:{question} 
      A:
      """
      tps = PromptTemplate(template=template_sum, input_variables=["question"])
      tp = PromptTemplate(template=template, input_variables=["question"])
      fqs = tps.format(question=prompt)
      index = faiss.read_index(index_name)
      with open(namespace, "rb") as f:
          docsearch = pickle.load(f)
      docsearch.index = index
      llm = ChatOpenAI(streaming=True, callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]), verbose=True, temperature=0,openai_api_key=OPENAI_API_KEY)
      chain = load_qa_chain(llm,chain_type="stuff")
      docs = docsearch.similarity_search(query, include_metadata=True,k=10)
      # r = await chain.arun(input_documents=docs, question=fqs)
      # fq = tp.format(question=r)
      ret = await chain.arun(input_documents=docs, question=fqs)
      q.put(job_done)
      return ret
Traceback (most recent call last):
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/routes.py", line 393, in run_predict
    output = await app.get_blocks().process_api(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 1059, in process_api
    result = await self.call_function(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 882, in call_function
    prediction = await anyio.to_thread.run_sync(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/utils.py", line 549, in async_iteration
    return next(iterator)
  File "/home/notebook/data/personal/coding/gpt/chatgpt/t3.py", line 148, in bot
    next_token = q.get(True, timeout=1)
  File "/opt/conda/envs/chatgpt39/lib/python3.9/queue.py", line 179, in get
    raise Empty
_queue.Empty
coffin5257 commented 1 year ago

@tugot17这绝对可以改进,但会做你想做的:

from queue import Queue
from typing import Any

import gradio as gr
from anyio.from_thread import start_blocking_portal
from langchain.callbacks.base import AsyncCallbackHandler, AsyncCallbackManager
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

class StreamingLLMCallbackHandler(AsyncCallbackHandler):
    """Callback handler for streaming LLM responses to a queue."""

    def __init__(self, q):
        self.q = q

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.q.put(token)

def bot(chatbot):
    user_msg = chatbot[-1][0]

    prompt = user_msg
    q = Queue()
    job_done = object()

    async def task(prompt):
        llm = ChatOpenAI(
            verbose=True,
            temperature=0,
            streaming=True,
            callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]),
        )

        ret = await llm.agenerate([[HumanMessage(content=prompt)]])
        q.put(job_done)
        return ret

    with start_blocking_portal() as portal:
        portal.start_task_soon(task, prompt)

        content = ""
        while True:
            next_token = q.get(True, timeout=1)
            if next_token is job_done:
                break
            content += next_token
            chatbot[-1][1] = content

            yield chatbot

with gr.Blocks() as demo:
    chatbot = gr.Chatbot()
    msg = gr.Textbox(label="Prompt")

    def chat(user_message, history):
        return "", history + [[user_message, None]]

    msg.submit(chat, [msg, chatbot], [msg, chatbot], queue=False).then(
        bot, chatbot, chatbot
    )

demo.queue()
demo.launch()

@jrhe hi,When I modified from your code, an error is popping up. Could you point out the problem for me?

async def task(prompt):
      template = """XXXXXXX
      Q:{question} 
      A:
      """
      template_sum = """XXXXXXX
      Q:{question} 
      A:
      """
      tps = PromptTemplate(template=template_sum, input_variables=["question"])
      tp = PromptTemplate(template=template, input_variables=["question"])
      fqs = tps.format(question=prompt)
      index = faiss.read_index(index_name)
      with open(namespace, "rb") as f:
          docsearch = pickle.load(f)
      docsearch.index = index
      llm = ChatOpenAI(streaming=True, callback_manager=AsyncCallbackManager([StreamingLLMCallbackHandler(q)]), verbose=True, temperature=0,openai_api_key=OPENAI_API_KEY)
      chain = load_qa_chain(llm,chain_type="stuff")
      docs = docsearch.similarity_search(query, include_metadata=True,k=10)
      # r = await chain.arun(input_documents=docs, question=fqs)
      # fq = tp.format(question=r)
      ret = await chain.arun(input_documents=docs, question=fqs)
      q.put(job_done)
      return ret
Traceback (most recent call last):
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/routes.py", line 393, in run_predict
    output = await app.get_blocks().process_api(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 1059, in process_api
    result = await self.call_function(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/blocks.py", line 882, in call_function
    prediction = await anyio.to_thread.run_sync(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/conda/envs/chatgpt39/lib/python3.9/site-packages/gradio/utils.py", line 549, in async_iteration
    return next(iterator)
  File "/home/notebook/data/personal/coding/gpt/chatgpt/t3.py", line 148, in bot
    next_token = q.get(True, timeout=1)
  File "/opt/conda/envs/chatgpt39/lib/python3.9/queue.py", line 179, in get
    raise Empty
_queue.Empty

It means q is still empty after you wait for 1s, maybe you should wait for token without timeout or increase the default timeout. The comment from python:

    def get(self, block=True, timeout=None):
        '''Remove and return an item from the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
        '''
tron19920125 commented 1 year ago

None

@coffin5257 Thanks for the kind help. I just set the timeout to None, but it still doesn't return anything. Would it be possible that there are other causes?

bent-verbiage commented 1 year ago

@jrhe thanks so much for providing that example!

[EDIT] Actually, please ignore the below. I got the qa_chain working with @jrhe's code and using the arun function. The reason it didn't work before was because I didn't pass in the parameters correctly, and exceptions do not show up due to the await.

After a lot of trial and error, at least that gives something to work with. Like @tron19920125 mentioned though, it does seem to stop working as soon as you try to apply it to a qa_chain. And it's not the timeout that's the issue. I suspect there must be something in the chain that doesn't apply 'wait' or 'async' properly, but haven't been able to find out what yet. If anyone who reads this, knows the solution, your assistance would definitely be appreciated.

jnak commented 1 year ago

I haven't tested it but FYI RedisChatMessageHistory is blocking so be careful not to call this from the main thread if you're running an async server such as FastAPI.

FrancescoSaverioZuppichini commented 1 year ago

You can also use normal threads, something like

# assuming this is inside your function associated with the chatbot

 # in the queue we will store our streamed tokens
    q = Queue()
    # let's create our default chat
    chat = ChatOpenAI(
        model_name=MODELS_NAMES[0],
        temperature=DEFAULT_TEMPERATURE,
        streaming=True,
        callbacks=([QueueCallback(q)]),
    )

job_done = object()

messages.append(HumanMessage(content=message))

def task():
    chat(messages)
    q.put(job_done)

t = Thread(target=task)
t.start()
chatbot_messages.append((message, ""))
content = ""

while True:
    try:
        next_token = q.get(True, timeout=1)
        if next_token is job_done:
            break
        content += next_token
        chatbot_messages[-1] = (message, content)
        yield "", chatbot_messages
    except Empty:
        continue

and my callback

class QueueCallback(BaseCallbackHandler):
    """Callback handler for streaming LLM responses to a queue."""

    def __init__(self, q):
        self.q = q

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.q.put(token)

    def on_llm_end(self, *args, **kwargs: Any) -> None:
        return self.q.empty()
xiaoyuge886 commented 1 year ago

The default AsyncIteratorCallbackHandler has a bug. In the example by @jianhuihi , during the first chat, on_llm_end is triggered before self.queue is empty, causing an error when other.pop().cancel() is called. After resolving this issue with if other.pop():, during the second chat, the while not self.queue.empty() or not self.done.is_set() loop in aiter will exit before on_llm_start (especially when executed in chain), causing aiter to end immediately and get empty messages.

Maybe we can self.done.clear() before break the while loop, or use an user controlled queue as @jrhe provided

have any solution for this i use ConversationalRetrievalChain but it not a perfect solution ->> self.run_times = 1


class AiAsyncIteratorCallbackHandler(AsyncIteratorCallbackHandler, ABC):
    """Callback handler that returns an async iterator."""

    queue: asyncio.Queue[str]

    done: asyncio.Event

    @property
    def always_verbose(self) -> bool:
        return True

    def __init__(self) -> None:
        super(AiAsyncIteratorCallbackHandler, self).__init__()
        self.run_times = 1
        self.run_time = int(time.time())

    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.queue.put_nowait(token)

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        # TODO self.done.set() 执行的合适时机
        if self.run_times == 2:
            self.done.set()
        self.run_times += 1
        pass
ChinmayShrivastava commented 1 year ago

Hi, I have been struggling with the langchain completion too. Could someone please have a look at my code, not sure what is going wrong.

async def send_message_to_room(room_group_name, message, channel_layer):
    print('send_message_to_room')
    await channel_layer.group_send(
        room_group_name,
        {
            "type": "chat_message",
            "message": message,
        }
    )

class StreamingLLMCallbackHandler(BaseCallbackHandler):
"""Callback handler for streaming LLM responses to a queue."""

    def __init__(self, q: asyncio.Queue, roomname: str):
        self.q = q
        self.channel_layer = get_channel_layer()
        self.roomname = roomname

    def on_llm_new_token(self, token: str, **kwargs):
        self.q.put_nowait(token)

    async def process_queue(self):
        while True:
            print("Waiting for token")
            token = await self.q.get()
            if token=='<|endoftext|>':
                break
            print(f"Token received: {token}")
            await send_message_to_room(self.roomname, token, self.channel_layer) # this works

async def run_chat(roomname,
    question=question, texts=texts, responsetype="Detailed",
    system_message_with_response_type=system_message_with_response_type,
    human_message_with_response_type=human_message_with_response_type
):
    q = asyncio.Queue()
    callback_handler = StreamingLLMCallbackHandler(q, roomname=roomname)

    llm = ChatOpenAI(
        max_tokens=250,
        streaming=True,
        callbacks=[callback_handler]
    )

    # Start a separate thread for processing the queue
    p = asyncio.create_task(callback_handler.process_queue())

    system_message_with_response_type = SystemMessagePromptTemplate.from_template(system_message_with_response_type)
    human_message_prompt = HumanMessagePromptTemplate.from_template(human_message_with_response_type)

    chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

    prompt_value = chat_prompt.format_prompt(question=question, texts=texts, responsetype=responsetype)

    llm(prompt_value.to_messages())

    await q.put('<|endoftext|>')
    await p

The group_send() method sends messages to the channel, but the messages aren't being received at the front until the LLM has finished generating, I am not sure. why there is this buffering? Additionally, the buffering seems to happen twice, once after the completion is generated, after which the process_queue finally starts operating, and the second when the process_queue has finished, after which the messages are actually sent to the group_room bu the consumer using the chat_message method. Not sure what I can do to make all this simultaneous. I have tried async operations, and only this code, as it is attached, seems to work.

shengzhou1216 commented 1 year ago

AsyncIteratorCallbackHandler works. Here is the example code:


from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

class TestLangchainAsync(unittest.IsolatedAsyncioTestCase):
    async def test_aiter(self):
        handler = AsyncIteratorCallbackHandler()
        llm = OpenAI(
            temperature=0,
            streaming=True,
            callbacks=[handler],
            openai_api_key="sk-xxxxx",
            openai_proxy="http://127.0.0.1:7890",
        )
        prompt = PromptTemplate(
            input_variables=["product"],
            template="What is a good name for a company that makes {product}?",
        )
        prompt = prompt.format(product="colorful socks")
        asyncio.create_task(llm.agenerate([prompt]))
        async for i in handler.aiter():
            print(i)
eshaanagarwal commented 1 year ago

Hey can anybody point me to use it with conversational retrieval chain using open ai api on gradio. I would really appreciate it

eshaanagarwal commented 1 year ago

@jrhe thanks so much for providing that example!

[EDIT] Actually, please ignore the below. I got the qa_chain working with @jrhe's code and using the arun function. The reason it didn't work before was because I didn't pass in the parameters correctly, and exceptions do not show up due to the await.

After a lot of trial and error, at least that gives something to work with. Like @tron19920125 mentioned though, it does seem to stop working as soon as you try to apply it to a qa_chain. And it's not the timeout that's the issue. I suspect there must be something in the chain that doesn't apply 'wait' or 'async' properly, but haven't been able to find out what yet. If anyone who reads this, knows the solution, your assistance would definitely be appreciated.

Hey can you please provide the code for this ! It would be super helpful

jvelezmagic commented 1 year ago

Another gist on the topic. c:

https://gist.github.com/jvelezmagic/03ddf4c452d011aae36b2a0f73d72f68#gistcomment-4599119

morteymike commented 1 year ago

Here's a concise gist I wrote on how to do this with threads and queues using the great answer by @FrancescoSaverioZuppichini https://gist.github.com/mortymike/70711b028311681e5f3c6511031d5d43

eshaanagarwal commented 1 year ago

Hi but i require a solution which handles conversationretrioeval chain for this ? i am not able to understand how to do that

eshaanagarwal commented 1 year ago

If i try to use it with conversational retrieval chain, then it prints the condensed question rather than the answer. here is the code i am using :


from threading import Thread
from typing import Any, Union, List, Dict
from queue import SimpleQueue

from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union, List, Dict
from queue import SimpleQueue

def load_exisiting_db():
    embeddings = HuggingFaceInstructEmbeddings(model_name=bi_enc_dict[EMBED_MODEL],
                                    query_instruction='Represent the question for retrieving supporting paragraphs: ',
                                    embed_instruction='Represent the paragraph for retrieval: ')

    db = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
    return db 

def load_retrieval_chain(vectorstore, model_name, streaming_callback):

    '''Load Chain'''

    # callbacks = [StreamingStdOutCallbackHandler()]
    callbacks = streaming_callback

    if model_name =='openai':
        chat_llm = ChatOpenAI(model_name = 'gpt-3.5-turbo',
                          callbacks=callbacks, 
                          temperature=0,
                          streaming = True
                         )
        question_generator = LLMChain(llm=chat_llm, prompt=CONDENSE_QUESTION_PROMPT)
        prompt = load_prompt(model_name='openai')
        doc_chain = load_qa_chain(llm=chat_llm,chain_type="stuff",prompt=prompt) 

    chain = ConversationalRetrievalChain(retriever=vectorstore.as_retriever(search_kwargs={"k": target_source_chunks }), 
                                     question_generator=question_generator, 
                                     combine_docs_chain=doc_chain, 
                                     memory=memory, 
                                     return_source_documents=True, 
                                     get_chat_history=lambda h :h)

    return chain

job_done = object()
class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when LLM starts running. Clean the queue."""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except self.q.empty():
                continue

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        # print("tiktik token ", token)
        self.q.put(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when LLM ends running."""
        self.q.put(job_done)

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """Run when LLM errors."""

job_done = object() # signals the processing is done

q = SimpleQueue()

def load_streaming_callback(streaming):
    if streaming:
        streaming_callback =[StreamingGradioCallbackHandler(q),
                                    StreamingStdOutCallbackHandler()]
    else :
        streaming_callback =[StreamingStdOutCallbackHandler()]

    return streaming_callback

def load_chatbot_chain(streaming=True):
    db = load_exisiting_db()
    streaming_callback = load_streaming_callback(streaming)
    chain = load_retrieval_chain(db, model_name = 'openai', streaming_callback=streaming_callback)
    return chain

chatbot_chain = load_chatbot_chain()

def stream_run(question):
        answer = chatbot_chain({'question': question})
        q.put(job_done)
        # return answer['answer']

    def stream_bot(history):
        job_done = object() # signals the processing is done
        user_question = history[-1][0]
        thread = Thread(target=stream_run, kwargs={"question": user_question})
        thread.start()
        history[-1][1] = ""
        while True:
            next_token = q.get(block=True) # Blocks until an input is available
            print("nexttoek  ",next_token)
            if not isinstance(next_token,str):
                break
            history[-1][1] += next_token
            yield history
        thread.join()
asparagusbeef commented 1 year ago

I am trying to stream outputs directly from agent to an API endpoint, I cant seem to get it to work! The async is behaving very odd, sometimes it prints the same response twice, sometimes, it doesn't print a response at all. What am I doing wrong?

I will include all my code here, because I am really unsure which part of the chain is the reason. Note that I am a complete beginner in working with co-routines and async operations.

MyCustomCallBacks.py:

from typing import Any, Dict, List
import tiktoken
from langchain.callbacks.base import BaseCallbackHandler
from loguru import logger
from langchain.schema import LLMResult

class ToolsInputCallBackHandler(BaseCallbackHandler):

    def __init__(self, logger):
        self.logger = logger

    def on_agent_action(self, serialized, **kwargs):
        self.logger.info(serialized.log)

class ToolsOutputCallBackHandler(BaseCallbackHandler):

    def on_tool_end(self, serialized, **kwargs):
        logger.info(serialized)

class TokenCostProcess:
 ## Not relevant here

class CostCalcHandler(BaseCallbackHandler):
 ## Not relevant here

Agent.py

from BotWrapper import Tool1, Tool2, Tool3, getSystemPrompt
from MyCustomCallBacks import ToolsInputCallBackHandler, ToolsOutputCallBackHandler, TokenCostProcess, CostCalcHandler

from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI
from langchain.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryMemory, CombinedMemory
from langchain.agents import initialize_agent, AgentType
from langchain.schema.messages import SystemMessage
from typing import Any
from loguru import logger
from ansi2html import Ansi2HTMLConverter
import io
import time
from typing import Awaitable
import asyncio

class MyAgent:
    def __init__(self, 
                 model: str="gpt-3.5-turbo-16k-0613"):
        system_message = SystemMessage(content=getSystemPrompt())

        self.logger = self.initialize_logger()

        self.tools = []
        for tool in (Tool1, Tool2, Tool3):
            tool.callbacks = [ToolsOutputCallBackHandler()]
            self.tools.append(tool)
        self.model = model
        self.token_cost_process = TokenCostProcess()
        self.async_callback = AsyncIteratorCallbackHandler() # Faulty?
        self.llm = ChatOpenAI(
            model=self.model, 
            temperature=1,
            streaming=True, 
            callbacks=[CostCalcHandler(self.model, self.token_cost_process)]
        )
        self.agent_kwargs = {
            "extra_prompt_messages": [MessagesPlaceholder(variable_name="chat_history_lines"), 
                                      MessagesPlaceholder(variable_name="summary")],
            "system_message": system_message
        }

        self.tools_input_handler = ToolsInputCallBackHandler(self.logger)
        self.memory = self.reset_memory()
        self.agent = initialize_agent(
            tools=self.tools, 
            llm=self.llm, 
            agent=AgentType.OPENAI_FUNCTIONS,
            agent_kwargs=self.agent_kwargs,
            memory=self.memory,
            callbacks=[self.tools_input_handler]
        )
    def reset_memory(self):
        conv_memory = ConversationBufferWindowMemory(k=2, 
                                                    memory_key="chat_history_lines", 
                                                    return_messages=True, 
                                                    ai_prefix='Alfred', 
                                                    human_prefix='Caller',
                                                    input_key="input")
        summary_memory = ConversationSummaryMemory(llm=OpenAI(max_tokens=512), 
                                                memory_key="summary", 
                                                return_messages=True, 
                                                ai_prefix='Alfred', 
                                                human_prefix='Caller',
                                                input_key="input")
        memory = CombinedMemory(memories = [summary_memory, conv_memory])    
        return memory

    def initialize_logger(self):
        logger.remove()
        self.log_stream = io.StringIO()
        logger.add(self.log_stream, colorize=True, enqueue=True)
        return logger

    async def wrap_done(self, fn: Awaitable, event: asyncio.Event):  # Code I took from git. Perhaps I am using it wrong.
        """Wrap an awaitable with a event to signal when it's done or an exception is raised."""
        try:
            await fn
        except Exception as e:
            # TODO: handle exception
            print(f"Caught exception: {e}")
        finally:
            # Signal the aiter to stop.
            event.set()

    async def __call__(self, user_input):
        task = asyncio.create_task(self.wrap_done(
        self.agent.acall(user_input, callbacks=[self.async_callback]),
        self.async_callback.done)
        )
        complete_text = ''
        async for token in self.async_callback.aiter():
            complete_text+=token
            yield token
        await task

    def _get_log(self):
        content = self.log_stream.getvalue()
        conv = Ansi2HTMLConverter()
        html = conv.convert(content, full=True)
        html = html.replace(r'\n', '\n')
        return html

    def get_log(self):
        heading = "Get Log Was Called! Summarizing Costs...\n\n"
        self.logger.info(heading+self.token_cost_process.get_cost_summary(self.model))
        time.sleep(1)
        return self._get_log()

async def print_agent_response(agentCall):
    async for token in agentCall:
        print(token, end="")
    print()

agent= MyAgent()
response=agent('Write a short poem in pirate')
asyncio.run(print_agent_response(response))

Any help would be appreciated!! Thanks a lot :)

nfcampos commented 1 year ago

This is now available as the new .stream() and .astream() methods, see https://python.langchain.com/docs/expression_language/interface#stream

BuckLearnsCode commented 1 year ago

Does anyone have an example of using .stream() or .astream()? I'm fairly new to coding, I'm not following the documentation @nfcampos linked to. Much appreciated.

evanbrociner commented 12 months ago

An example would be great!