langchain-ai / langchain

πŸ¦œπŸ”— Build context-aware reasoning applications
https://python.langchain.com
MIT License
88.89k stars 14k forks source link

Streaming of Source Documents not working in ConversationalRetrievalChain #19259

Closed anigit-2009 closed 6 days ago

anigit-2009 commented 3 months ago

Checked other resources

Example Code

import `json`
import redis
from fastapi import APIRouter, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from langchain.chains import ConversationalRetrievalChain, ConversationChain
from langchain.callbacks.base import AsyncCallbackHandler
from langchain.callbacks.manager import AsyncCallbackManager
from fastapi.responses import StreamingResponse
from typing import Any, Awaitable, Callable, Iterator, Optional, Union
from langchain.chains.conversational_retrieval.prompts import (
    CONDENSE_QUESTION_PROMPT,
    QA_PROMPT,
)
from langchain.chains.llm import LLMChain
from langchain.chains.question_answering import load_qa_chain
from langchain_community.chat_models import AzureChatOpenAI
from langchain_openai import AzureOpenAIEmbeddings
from langchain.memory import ConversationBufferMemory, ConversationBufferWindowMemory
from langchain_community.chat_message_histories import RedisChatMessageHistory
from langchain.prompts import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
    SystemMessagePromptTemplate,
)
from langchain.prompts.prompt import PromptTemplate
from langchain_community.vectorstores import Milvus
from pydantic import BaseModel, validator
from typing import Optional
from starlette.types import Send
from genai_openwork_backend.services.coroutine.loop import get_loop
from genai_openwork_backend.app.api.chat_history.views import (
    get_recent_chats_rag,
    cache_chat
)
from genai_openwork_backend.db.connection import get_connection, aget_connection,release_connection
from genai_openwork_backend.config import config
from datetime import datetime
import re
from enum import Enum as PyEnum

router = APIRouter()

redis_host = config.redis['REDIS_HOST']
redis_port = config.redis['REDIS_PORT']
openai_api_version = config.llm["OPENAI_API_VERSION"]
deployment_name=config.llm['DEPLOYMENT_NAME']
model_name=config.llm['LLM_MODEL']
openai_api_base=config.llm['AZURE_OPENAI_ENDPOINT']
deployment=config.llm['EMBEDDING']
model=config.llm['MODEL']
openai_api_type=config.llm['OPENAI_API_TYPE']
milvus_host = config.vectordb['MILIVS_HOST']
milvus_port = config.vectordb['MILIVUS_PORT']

# No change here - Using the default version
CONDENSE_QUESTION_PROMPT = PromptTemplate(
    input_variables=[
        "chat_history",
        "question",
    ],
    template="Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.\n\nChat History:\n{chat_history}\nFollow Up Input: {question}\nStandalone question:",
)

# QA prompt updated
_template = """You are a helpful AI assisstant. The following is a friendly conversation between a human and you.
Use the following documents provided as context to answer the question at the end . If you don't know the answer, just say you don't know. DO NOT try to make up an answer.
If the question is not related to the context, politely respond that you are tuned to only answer questions that are related to the context.

Also, generate three brief follow-up questions that the user would likely ask next. Try not to repeat questions that have already been asked. Only generate questions in next line with a tag 'Next Questions' as a Markdown list. 

{question}
=========
{context}
=========
Answer:"""

variables = ["context", "question"]

QA_PROMPT = PromptTemplate(
    template=_template,
    input_variables=variables,
)

class AllowedChatModels(str, PyEnum):
    value1 = "gpt-3.5"
    value2 = "gpt-4"

class ConversationStyles(str, PyEnum):
    value1 = "precise"
    value2 = "balanced"
    value3 = "creative"

class RagChatRequest(BaseModel):
    """Request model for chat requests.
    Includes the conversation ID and the message from the user.
    """

    user_id: str
    conversation_id: str
    question: str
    collection: Optional[list] = ["all"]
    vectordb_collection : Optional[str] = "openwork"
    chatModel : Optional[AllowedChatModels] = "gpt-3.5"
    conversationStyle : Optional[ConversationStyles] = "precise"

Sender = Callable[[Union[str, bytes]], Awaitable[None]]

class EmptyIterator(Iterator[Union[str, bytes]]):
    def __iter__(self):
        return self

    def __next__(self):
        raise StopIteration

class AsyncStreamCallbackHandler(AsyncCallbackHandler):
    """Callback handler for streaming, inheritance from AsyncCallbackHandler."""

    def __init__(self, send: Sender):
        super().__init__()
        self.send = send

    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Rewrite on_llm_new_token to send token to client."""
        await self.send(f"{token}")

class ChatOpenAIStreamingResponse(StreamingResponse):
    """Streaming response for openai chat model, inheritance from StreamingResponse."""

    def __init__(
        self,
        generate: Callable[[Sender], Awaitable[None]],
        request,
        status_code: int = 200,
        media_type: Optional[str] = None,
    ) -> None:
        super().__init__(
            content=EmptyIterator(),
            status_code=status_code,
            media_type=media_type,
        )
        self.generate = generate
        self.request = request
        self.answer = b''

    async def stream_response(self, send: Send) -> None:
        """Rewrite stream_response to send response to client."""
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            },
        )

        async def send_chunk(chunk: Union[str, bytes]):
            if not isinstance(chunk, bytes):
                chunk = chunk.encode(self.charset)
                self.answer += chunk
            await send({"type": "http.response.body", "body": chunk, "more_body": True})

        # send body to client
        await self.generate(send_chunk)

        # send empty body to client to close connection
        await send({"type": "http.response.body", "body": b"", "more_body": False})

def transformConversationStyleToTemperature(style : str):

    style_temp_obj = {
        "precise" : 1,
        "balanced" : 0.5,
        "creative" : 0.1
    }

    return style_temp_obj.get(style)

def send_message_tredence_llm(
    query: RagChatRequest,
) -> Callable[[Sender], Awaitable[None]]:
    async def generate(send: Sender):

        temperature = transformConversationStyleToTemperature(query.conversationStyle)

        chat_model = AzureChatOpenAI(
            streaming=True,
            azure_endpoint=openai_api_base,
            deployment_name=deployment_name,
            model_name=model_name,
            openai_api_version=openai_api_version,
            verbose=True,
        )

        chat_model2 = AzureChatOpenAI(
            streaming=False,
            azure_endpoint=openai_api_base,
            deployment_name=deployment_name,
            model_name=model_name,
            openai_api_version=openai_api_version,
            verbose=True,
        )
        chat_model.temperature = temperature
        chat_model2.temperature = temperature

        embeddings = AzureOpenAIEmbeddings(
            deployment=deployment,
            model=str(model),
            azure_endpoint=openai_api_base,
            openai_api_type=openai_api_type,
            openai_api_version=openai_api_version,
        )

        vectorstore = Milvus(
            embeddings,
            collection_name=query.vectordb_collection,
            connection_args={"host": milvus_host, "port": milvus_port},
        )

        chain_input = query.question

        memory = ConversationBufferWindowMemory( k=10, return_messages=True,memory_key="chat_history")

        chat_list = await get_recent_chats_rag(query.conversation_id)

        if(len(chat_list)):
            for c in chat_list:
                memory.save_context({"input": c["input"]}, {"output": c["output"]})

        # Set up the chain
        question_generator = LLMChain(
            llm=chat_model2,
            prompt=CONDENSE_QUESTION_PROMPT,
        )

        doc_chain = load_qa_chain(
            llm=chat_model,
            chain_type="stuff",
            prompt=QA_PROMPT,
            # callback_manager=AsyncCallbackManager(
            #     [AsyncStreamCallbackHandler(send)],
            # ),
        )
        if len(query.collection) == 1 and query.collection[0] == "all":
                expression = ""
        else:
            expression = f'group in ["{query.collection[0]}"'
            for i in range(1,len(query.collection)):
                expression += f',"{query.collection[i]}"'
            expression += "]"
        print("expression", expression)

        chain = ConversationalRetrievalChain(
            memory=memory,
            combine_docs_chain=doc_chain,
            question_generator=question_generator,
            retriever=vectorstore.as_retriever(
                search_type="similarity", search_kwargs={"k": 4, "expr": f"{expression}"}
            ),
            verbose=True,
            return_source_documents=True
        )

        history = memory.chat_memory.messages
        print(history)
        await chain.acall(chain_input, callbacks=[AsyncStreamCallbackHandler(send)])

    return generate

@router.post("/rag/stream")
async def stream(request: RagChatRequest):

    return ChatOpenAIStreamingResponse(
        send_message_tredence_llm(request),
        request,
        media_type="text/event-stream",
    )

Error Message and Stack Trace (if applicable)

Entering new ConversationalRetrievalChain chain...

Finished chain. 2024-03-19 09:40:13.368 | ERROR | trace_id=0 | span_id=0 | uvicorn.protocols.http.httptools_impl:run_asgi:424 - Exception in ASGI application

Traceback (most recent call last):

File "", line 1, in File "/home/azureuser/.pyenv/versions/3.10.13/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main exitcode = _main(fd, parent_sentinel) β”‚ β”‚ β”” 4 β”‚ β”” 10 β”” <function _main at 0x7f0ec497bd00> File "/home/azureuser/.pyenv/versions/3.10.13/lib/python3.10/multiprocessing/spawn.py", line 129, in _main return self._bootstrap(parent_sentinel) β”‚ β”‚ β”” 4 β”‚ β”” <function BaseProcess._bootstrap at 0x7f0ec4b627a0> β”” File "/home/azureuser/.pyenv/versions/3.10.13/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap self.run() β”‚ β”” <function BaseProcess.run at 0x7f0ec4b61e10> β”” File "/home/azureuser/.pyenv/versions/3.10.13/lib/python3.10/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) β”‚ β”‚ β”‚ β”‚ β”‚ β”” {'config': <uvicorn.config.Config object at 0x7f0ec4ca2950>, 'target': <bound method Server.run of <uvicorn.server.Server obj... β”‚ β”‚ β”‚ β”‚ β”” β”‚ β”‚ β”‚ β”” () β”‚ β”‚ β”” β”‚ β”” <function subprocess_started at 0x7f0ec3fc9ea0> β”” File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/uvicorn/_subprocess.py", line 76, in subprocess_started target(sockets=sockets) β”‚ β”” [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 1785)>] β”” <bound method Server.run of <uvicorn.server.Server object at 0x7f0ec4ca28f0>> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/uvicorn/server.py", line 60, in run return asyncio.run(self.serve(sockets=sockets)) β”‚ β”‚ β”‚ β”‚ β”” [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 1785)>] β”‚ β”‚ β”‚ β”” <function Server.serve at 0x7f0ec3fc9360> β”‚ β”‚ β”” <uvicorn.server.Server object at 0x7f0ec4ca28f0> β”‚ β”” <function run at 0x7f0ec4991360> β”” <module 'asyncio' from '/home/azureuser/.pyenv/versions/3.10.13/lib/python3.10/asyncio/init.py'> File "/home/azureuser/.pyenv/versions/3.10.13/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) β”‚ β”‚ β”” <coroutine object Server.serve at 0x7f0ec3eb21f0> β”‚ β”” <method 'run_until_complete' of 'uvloop.loop.Loop' objects> β””

File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 419, in run_asgi result = await app( # type: ignore[func-returns-value] β”” <uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware object at 0x7f0e8dc1cd00> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in call return await self.app(scope, receive, send) β”‚ β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.send of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc707... β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <fastapi.applications.FastAPI object at 0x7f0ea55efb80> β”” <uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware object at 0x7f0e8dc1cd00> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/fastapi/applications.py", line 270, in call await super().call(scope, receive, send) β”‚ β”‚ β”” <bound method RequestResponseCycle.send of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc707... β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/applications.py", line 124, in call await self.middleware_stack(scope, receive, send) β”‚ β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.send of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc707... β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <starlette.middleware.errors.ServerErrorMiddleware object at 0x7f0e8dc1df30> β”” <fastapi.applications.FastAPI object at 0x7f0ea55efb80> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in call raise exc File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in call await self.app(scope, receive, _send) β”‚ β”‚ β”‚ β”‚ β”” <function ServerErrorMiddleware.call.._send at 0x7f0e8dc743a0> β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <starlette.middleware.exceptions.ExceptionMiddleware object at 0x7f0e8dc1c850> β”” <starlette.middleware.errors.ServerErrorMiddleware object at 0x7f0e8dc1df30> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in call raise exc File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in call await self.app(scope, receive, sender) β”‚ β”‚ β”‚ β”‚ β”” <function ExceptionMiddleware.call..sender at 0x7f0e8dc74430> β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <fastapi.middleware.asyncexitstack.AsyncExitStackMiddleware object at 0x7f0ec1d63bb0> β”” <starlette.middleware.exceptions.ExceptionMiddleware object at 0x7f0e8dc1c850> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in call raise e File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in call await self.app(scope, receive, send) β”‚ β”‚ β”‚ β”‚ β”” <function ExceptionMiddleware.call..sender at 0x7f0e8dc74430> β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <fastapi.routing.APIRouter object at 0x7f0ec1dc0be0> β”” <fastapi.middleware.asyncexitstack.AsyncExitStackMiddleware object at 0x7f0ec1d63bb0> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/routing.py", line 706, in call await route.handle(scope, receive, send) β”‚ β”‚ β”‚ β”‚ β”” <function ExceptionMiddleware.call..sender at 0x7f0e8dc74430> β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <function Route.handle at 0x7f0ec31b5000> β”” APIRoute(path='/api/openwork/rag/stream', name='stream', methods=['POST']) File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle await self.app(scope, receive, send) β”‚ β”‚ β”‚ β”‚ β”” <function ExceptionMiddleware.call..sender at 0x7f0e8dc74430> β”‚ β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”‚ β”” <function request_response..app at 0x7f0e8dc23d00> β”” APIRoute(path='/api/openwork/rag/stream', name='stream', methods=['POST']) File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/routing.py", line 69, in app await response(scope, receive, send) β”‚ β”‚ β”‚ β”” <function ExceptionMiddleware.call..sender at 0x7f0e8dc74430> β”‚ β”‚ β”” <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.httptools_impl.RequestResponseCycle object at 0x7f0e8dc... β”‚ β”” {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'http_version': '1.1', 'server': ('127.0.0.1', 1785), 'cl... β”” <genai_openwork_backend.app.api.openwork.views.ChatOpenAIStreamingResponse object at 0x7f0e8dc70c40> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/responses.py", line 266, in call async with anyio.create_task_group() as task_group: β”‚ β”‚ β”” <anyio._backends._asyncio.TaskGroup object at 0x7f0e8dc70b20> β”‚ β”” <function create_task_group at 0x7f0ec3e409d0> β”” <module 'anyio' from '/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/an... File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 597, in aexit raise exceptions[0] β”” [ValueError("One output key expected, got dict_keys(['answer', 'source_documents'])")] File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/starlette/responses.py", line 269, in wrap await func() β”” functools.partial(<bound method ChatOpenAIStreamingResponse.stream_response of <genai_openwork_backend.app.api.openwork.views...

File "/home/azureuser/anindya/genai_openwork_backend/genai_openwork_backend/app/api/openwork/views.py", line 233, in stream_response await self.generate(send_chunk) β”‚ β”‚ β”” <function ChatOpenAIStreamingResponse.stream_response..send_chunk at 0x7f0e8dc74a60> β”‚ β”” <function send_message_tredence_llm..generate at 0x7f0e8dc74550> β”” <genai_openwork_backend.app.api.openwork.views.ChatOpenAIStreamingResponse object at 0x7f0e8dc70c40>

File "/home/azureuser/anindya/genai_openwork_backend/genai_openwork_backend/app/api/openwork/views.py", line 392, in generate await chain.acall(chain_input, callbacks=[AsyncStreamCallbackHandler(send)]) β”‚ β”‚ β”‚ β”‚ β”” <function ChatOpenAIStreamingResponse.stream_response..send_chunk at 0x7f0e8dc74a60> β”‚ β”‚ β”‚ β”” <class 'genai_openwork_backend.app.api.openwork.views.AsyncStreamCallbackHandler'> β”‚ β”‚ β”” 'explain about supply chain tower' β”‚ β”” <function Chain.acall at 0x7f0eb5e0c940> β”” ConversationalRetrievalChain(memory=ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10), ve...

File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/langchain_core/_api/deprecation.py", line 154, in awarning_emitting_wrapper return await wrapped(*args, **kwargs) β”‚ β”‚ β”” {'callbacks': [<genai_openwork_backend.app.api.openwork.views.AsyncStreamCallbackHandler object at 0x7f0e8db997b0>]} β”‚ β”” (ConversationalRetrievalChain(memory=ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10), v... β”” <function Chain.acall at 0x7f0eb5e0c430> File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/langchain/chains/base.py", line 413, in acall return await self.ainvoke( β”‚ β”” <function Chain.ainvoke at 0x7f0eb5ddbb50> β”” ConversationalRetrievalChain(memory=ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10), ve... File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/langchain/chains/base.py", line 211, in ainvoke final_outputs: Dict[str, Any] = self.prep_outputs( β”‚ β”‚ β”‚ β”” <function Chain.prep_outputs at 0x7f0eb5e0c160> β”‚ β”‚ β”” ConversationalRetrievalChain(memory=ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10), ve... β”‚ β”” typing.Any β”” typing.Dict File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/langchain/chains/base.py", line 440, in prep_outputs self.memory.save_context(inputs, outputs) β”‚ β”‚ β”‚ β”‚ β”” {'answer': 'The article explains that a Supply Chain Control Tower (SCCT) is a cross-departmental, system-integrated β€œinforma... β”‚ β”‚ β”‚ β”” {'question': 'explain about supply chain tower', 'chat_history': []} β”‚ β”‚ β”” <function BaseChatMemory.save_context at 0x7f0eb5c085e0> β”‚ β”” ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10) β”” ConversationalRetrievalChain(memory=ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10), ve... File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/langchain/memory/chat_memory.py", line 37, in save_context input_str, output_str = self._get_input_output(inputs, outputs) β”‚ β”‚ β”‚ β”” {'answer': 'The article explains that a Supply Chain Control Tower (SCCT) is a cross-departmental, system-integrated β€œinforma... β”‚ β”‚ β”” {'question': 'explain about supply chain tower', 'chat_history': []} β”‚ β”” <function BaseChatMemory._get_input_output at 0x7f0eb5c08550> β”” ConversationBufferWindowMemory(return_messages=True, memory_key='chat_history', k=10) File "/home/azureuser/.pyenv/versions/3.10.13/envs/genai_openwork_backend_ani/lib/python3.10/site-packages/langchain/memory/chat_memory.py", line 29, in _get_input_output raise ValueError(f"One output key expected, got {outputs.keys()}")

ValueError: One output key expected, got dict_keys(['answer', 'source_documents'])

Description

I am trying to return the source documents along with the answer as a part of streamed response. I have put return_source_documents=True in ConversationalRetrievalChain parameters, however error is coming. If I comment it, then the answer gets streamed without any error. How to return the source documents in the stream?

System Info

langchain==0.1.0 langchain-community==0.0.20 langchain-core==0.1.23 langchain-openai==0.0.5 openinference-instrumentation-langchain==0.1.12

python --> 3.10.13

derekhsu commented 3 months ago

Any update? I have the same issue too.

derekhsu commented 3 months ago

I have the answer, ref: https://github.com/langchain-ai/langchain/issues/2256