Chainlit / chainlit

Build Conversational AI in minutes ⚡️
https://docs.chainlit.io
Apache License 2.0
6.24k stars 799 forks source link

Assistant error message appears before eventual result which incorrectly is placed as part of step? #760

Open scheung38 opened 4 months ago

scheung38 commented 4 months ago

Have two issues here:

1) Why getting immediately message:

_Chatbot

I'm sorry, but I am unable to provide real-time data or access specific *** information. You may need to check your company's records or database to gather..._

even though eventually NL-to-SQL result appears but as part of step?

2) How to get NL-to-SQL result as final and not embedded as part of the steps? ie User needs to click Step 3 in order to see the embedded result instead of the expected final result appearing.

Below I merged the bigQuery with the resume-chat example and used PostgreSQL instead of BigQuery:

Screenshot 2024-02-22 at 12 53 12

from dotenv import dotenv_values

from operator import itemgetter

from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import Runnable, RunnablePassthrough, RunnableLambda
from langchain.schema.runnable.config import RunnableConfig
from langchain.memory import ConversationBufferMemory

from chainlit.types import ThreadDict

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from datetime import date

from openai import AsyncOpenAI
from google.cloud import bigquery

import chainlit as cl
from chainlit.playground.providers.openai import ChatOpenAI 
from langchain.chat_models import ChatOpenAI as ResumeChatOpenAI

from fastapi import APIRouter, Query, HTTPException
import pandas as pd
from pydantic import BaseModel
from typing import Union
from langchain.sql_database import SQLDatabase
from langchain.memory import ConversationBufferWindowMemory
from langchain_experimental.sql import SQLDatabaseChain, SQLDatabaseSequentialChain  
from pyodbc import ProgrammingError, OperationalError, NotSupportedError

import logging

config=dotenv_values(".env")

db_name=config["db_name"]
host=config["host"]
user=config["user"]
password=config["password"]
port=config["port"]

# Database connection string
DATABASE_URL = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{db_name}'
print("DATABASE_URL: {}".format(DATABASE_URL))

# Create a database engine
engine = create_engine(DATABASE_URL)

# Create a Session class
Session = sessionmaker(bind=engine)

openai_client = AsyncOpenAI()

# settings = {"model": "gpt-3.5-turbo", "temperature": 0, "stop": ["```"]}
settings = {"model": "gpt-4", "temperature": 0, "stop": ["```"]}

sql_query_prompt = """You have PostgreSQL tables named `my_reports`, `my_invoices`, `my_logistics`, `payment_reports`,  `my_purchase_orders` and 'transactions' in the database `mydata`.

Write an PostgreSQL query, but avoid MS SQL or MySQL query to retrieve the full order based on the given question:

{input}
"""

explain_query_result_prompt = """Today is {date}
You received a query from a supplier or buyer regarding these tables.
They executed a PostgreSQL query, but avoid MS SQL or MySQL query, and provided the results in Markdown table format.
Analyze the table and explain the problem to the supplier or buyer only if there is error in SQL execution, else do not show any error.

Markdown Table:

{table}


Short and concise analysis:
"""

class Text(BaseModel):
    text: str
    activeChatId: str

llm = ResumeChatOpenAI(model_name="gpt-4", temperature=0, api_key=config['OPENAI_API_KEY']) 
chat_template = """ Based on the schema given {info} write an executable Microsoft SQL Server query for the user input, avoid MySQL query. 
        Execute it in the database and get sql results. Make a response to user from mssql results based on 
        the question. 
        Input: "user input"
        SQL query: "SQL Query here"
        """
chat_prompt = ChatPromptTemplate.from_messages([
    ('system', chat_template),
    MessagesPlaceholder(variable_name='history'),
    ('human', "{input}")
])

db = SQLDatabase.from_uri(DATABASE_URL)
table_info = db.table_info[:10]
m1 = ConversationBufferWindowMemory(k=4,return_messages=True)
# db_chain = SQLDatabaseChain.from_llm(llm, db,verbose = True, use_query_checker=True)
db_chain = SQLDatabaseSequentialChain.from_llm(llm, db,verbose = True, use_query_checker=True)

def setup_runnable():
    memory = cl.user_session.get("memory")  # type: ConversationBufferMemory
    model = ResumeChatOpenAI(streaming=True)
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful chatbot"),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{question}"),
        ]
    )

    runnable = (
        RunnablePassthrough.assign(
            history=RunnableLambda(memory.load_memory_variables) | itemgetter("history")
        )
        | prompt
        | model
        | StrOutputParser()
    )
    cl.user_session.set("runnable", runnable)

@cl.password_auth_callback
def auth():
    return cl.User(identifier="test")

@cl.on_chat_start
async def on_chat_start():
    cl.user_session.set("memory", ConversationBufferMemory(return_messages=True))
    setup_runnable()    

@cl.on_chat_resume
async def on_chat_resume(thread: ThreadDict):
    memory = ConversationBufferMemory(return_messages=True)
    root_messages = [m for m in thread["steps"] if m["parentId"] == None]
    for message in root_messages:
        if message["type"] == "USER_MESSAGE":
            memory.chat_memory.add_user_message(message["output"])
        else:
            memory.chat_memory.add_ai_message(message["output"])

    cl.user_session.set("memory", memory)

    setup_runnable()

# FROM BIGQUERY    
@cl.step(type="llm")
async def gen_query(human_query: str):
    current_step = cl.context.current_step
    current_step.generation = cl.ChatGeneration(
        provider=ChatOpenAI.id,
        messages=[
            cl.GenerationMessage(
                role="user",
                template=sql_query_prompt,
                formatted=sql_query_prompt.format(input=human_query),
            )
        ],
        settings=settings,
        inputs={"input": human_query},
    )

    # Call OpenAI and stream the message
    stream_resp = await openai_client.chat.completions.create(
        messages=[m.to_openai() for m in current_step.generation.messages],
        stream=True,
        **settings
    )
    async for part in stream_resp:
        token = part.choices[0].delta.content or ""
        if token:
            await current_step.stream_token(token)

    current_step.language = "sql"
    current_step.generation.completion = current_step.output

    return current_step.output

@cl.step
async def execute_query(query):
    # Execute the SQL query
    # query_job = client.query(query)

    # # Wait for the query to complete
    # query_job.result()

    # # Get the query results
    # results = query_job.to_dataframe()

    # markdown_table = results.to_markdown(index=False)

    # NEW PostgreSQL 
    # Create a session
    session = Session()

    result = session.execute(text(query))
    # result = session.execute(query)
    print("result: {}".format(result))

    # Convert the result to a pandas DataFrame for easier Markdown conversion
    df = pd.DataFrame(result.fetchall(), columns=result.keys())
    print("df: {}".format(df))

    markdown_table = df.to_markdown(index=False)
    print("markdown_table: {}".format(markdown_table))
    # Close the session
    session.close()

    return markdown_table

@cl.step(type="llm")
async def analyze(table):
    current_step = cl.context.current_step
    today = str(date.today())
    current_step.generation = cl.ChatGeneration(
        provider=ChatOpenAI.id,
        messages=[
            cl.GenerationMessage(
                role="user",
                template=explain_query_result_prompt,
                formatted=explain_query_result_prompt.format(date=today, table=table),
            )
        ],
        settings=settings,
        inputs={"date": today, "table": table},
    )

    final_answer = cl.Message(content="")
    await final_answer.send()

    # Call OpenAI and stream the message
    stream = await openai_client.chat.completions.create(
        messages=[m.to_openai() for m in current_step.generation.messages],
        stream=True,
        **settings
    )
    async for part in stream:
        token = part.choices[0].delta.content or ""
        if token:
            await final_answer.stream_token(token)

    final_answer.actions = [
        cl.Action(name="take_action", value="action", label="Take action")
    ]
    await final_answer.update()

    current_step.output = final_answer.content
    current_step.generation.completion = final_answer.content

    return current_step.output

@cl.step(type="run")
async def chain(human_query: str):
    # sql_query = await gen_query(human_query)
    # table = await execute_query(sql_query)
    # analysis = await analyze(table)
    # return analysis 

    res = await text(human_query)   
    return res

# @cl.step(type="run")
@cl.step(type="llm")
# async def text(text: Text, history: Union[str, None] = []): #, history: Optional[str]= Query(None, description="test")): # :str | None = []):
async def text(human_query: str, history: Union[str, None] = []): #, history: Optional[str]= Query(None, description="test")): # :str | None = []):
    history = history or [];
    try:        
        print('\033[31m' + 'human:' + '\033[0m', end='')
        # prompt = text.text # input()
        prompt = human_query # input()

        if prompt != '':
            chat = m1.load_memory_variables({})['history']
            formatted_prompt = chat_prompt.format(info=table_info, history=chat, input=prompt)                  
            response = db_chain.run(formatted_prompt)
            m1.save_context({'input': prompt}, {'output': response})                   

            return response # It is of type: <class 'str'>

    except (ProgrammingError, NotSupportedError, OperationalError, ValueError, TypeError, Exception) as e:
        print("Exception occurred: {}".format(str(e)))
        logging.error("Exception occurred: {}".format(str(e)))
        raise HTTPException(status_code=422, detail=f"Error in /text endpoint: {str(e)}")

# FROM RESUME-CHAT
@cl.on_message
async def on_message(message: cl.Message):

    # # await chain(message.content)
    # await text(message.content)

    memory = cl.user_session.get("memory")  # type: ConversationBufferMemory

    runnable = cl.user_session.get("runnable")  # type: Runnable

    res = cl.Message(content="")

    async for chunk in runnable.astream(
        {"question": message.content},
        config=RunnableConfig(callbacks=[cl.LangchainCallbackHandler()]),
    ):
        await res.stream_token(chunk)

    await chain(message.content)
    # await text(message.content)   
    await res.send()

    memory.chat_memory.add_user_message(message.content)
    memory.chat_memory.add_ai_message(res.content)

# FROM BIGQUERY
# @cl.on_message
# async def main(message: cl.Message):
#     await chain(message.content)  

@cl.action_callback(name="take_action")
async def take_action(action: cl.Action):
    await cl.Message(content="Contacting supplier/buyer...").send()

# @cl.oauth_callback
# def auth_callback(provider_id: str, token: str, raw_user_data, default_app_user):
#     if provider_id == "google":
#         if "@chainlit.io" in raw_user_data["email"]:
#             return default_app_user
#     return None
NabilaAhmedAdity commented 2 months ago

I am seeing the same error :(

willydouhard commented 2 months ago

If you want the chain step to not be indented try:

@cl.step(type="run", root=True)
NabilaAhmedAdity commented 2 months ago

Hi Will, thanks for reply. I think the problem is not about indentation, rather the created_time of steps is getting set as NULL. That's why when reading threads and steps from the database, and getting it sorted by time, the steps are appearing before the user message and kinda broke the sequence or flow of messages.