Maplemx / Agently

[AI Agent Application Development Framework] - 🚀 Build AI agent native application in very few code 💬 Easy to interact with AI agent in code using structure data and chained-calls syntax 🧩 Enhance AI Agent using plugins instead of rebuild a whole new agent
http://agently.tech
Apache License 2.0
908 stars 100 forks source link

使用workflow时出现Exception: [Agent Request] Error: An error occurred during streaming #108

Closed zifeiyu-tan closed 2 weeks ago

zifeiyu-tan commented 2 weeks ago

我的代码如下:

`import Agently import logging

agent_factory = ( Agently.AgentFactory() .set_settings("model.OpenAI.auth.api_key", "nothing") .set_settings("model.OpenAI.options", {"model": "Qwen-Chat"}) .set_settings("model.OpenAI.url", "http://127.0.0.1:9997/v1") )

workflow = Agently.Workflow()

workflow.settings.set("logger", logging.getLogger("Workflow").setLevel(logging.WARNING)) agent = agent_factory.create_agent()

start_chunk = workflow.schema.create_chunk( title = "Multi-Round Chat", type = "Start" )

user_input_chunk = workflow.schema.create_chunk( title = "User Input", handles = { "outputs": [{ "handle": "user_input" }], }, executor = lambda inputs, storage: { "user_input": input("User: ")#<- You can change it to an API/WS request },# You can use a lambda function as an executor )

def assistant_reply_executor(inputs, storage): chat_history = storage.get("chat_history") or [] reply = ( agent .set_role("姓名","智能客服") .set_role("角色", "你是XX有限公司的一名客服") .set_role( "行动规则", "首先需要根据{意图判定规则}对用户输入进行意图判定,"+ "根据上下文,将客户问题总结成简单问句,如果问句主语不全,根据上下文补全问句主语" ) .set_role( "意图判定规则", "从['日常闲聊', '公司部门介绍', '公司基本信息', '公司财经信息']中选择你判定的用户意图" )

        .user_info("和你对话的用户是青鸟消防有限公司的潜在客户")

        .chat_history(chat_history)
        .input(inputs["user_input"])

        .output({
            "意图":("str","根据意图判定规则和{input}判定用户意图"),
            "问题":("str","根据上下文,将{input}总结成简单问句,如果问句主语不全,根据上下文补全问句主语"),
            "对象":("str","提取{input}的主体")
        })
        .start()
)

print("[客服]: ", reply)
return { "assistant_reply": reply }

assistant_reply_chunk = workflow.schema.create_chunk( title = "Assistant Reply", handles = { "inputs": [{ "handle": "user_input" }], "outputs": [{ "handle": "assistant_reply" }], }, executor = assistant_reply_executor, )

def update_chat_history_executor(inputs, storage): chat_history = storage.get("chat_history") or [] chat_history.append({ "role": "user", "content": inputs["user_input"] }) chat_history.append({ "role": "assistant", "content": inputs["assistant_reply"] }) storage.set("chat_history", chat_history) return update_chat_history_chunk = workflow.schema.create_chunk( title = "Update Chat History", handles = { "inputs": [ { "handle": "user_input" }, { "handle": "assistant_reply" }, ], }, executor = update_chat_history_executor, )

start_chunk.connect_to(user_input_chunk) user_input_chunk.handle("user_input").connect_to(assistant_reply_chunk.handle("user_input")) user_input_chunk.handle("user_input").connect_to(update_chat_history_chunk.handle("user_input")) assistant_reply_chunk.handle("assistant_reply").connect_to(update_chat_history_chunk.handle("assistant_reply"))

update_chat_history_chunk.connect_to(user_input_chunk) workflow.start()`

第 一次提问,问题正常回答,第二次提问,会报如下错误:

客服: {'意图': '日常闲聊', '问题': '爸爸的爸爸叫什么?', '对象': '爸爸的爸爸'}

Exception in thread Thread-2 (start_in_theard): Traceback (most recent call last): File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 240, in start_in_theard reply = loop.run_until_complete(self.start_async(request_type)) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 227, in start_async raise(e) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 202, in start_async async for response in event_generator: File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/plugins/request/OpenAI.py", line 223, in broadcast_response_with_streaming async for part in response_generator: File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/openai/_streaming.py", line 147, in aiter async for item in self._iterator: File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/openai/_streaming.py", line 174, in stream raise APIError( openai.APIError: An error occurred during streaming

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 243, in start_in_theard raise Exception(f"[Agent Request] Error: { str(e) }") Exception: [Agent Request] Error: An error occurred during streaming

User:

zifeiyu-tan commented 2 weeks ago

我是用xinference部署的本地Qwen模型,我看了下好像跟xinference有关。xinference的log如下: 2024-06-17 16:41:21,021 xinference.api.restful_api 79450 ERROR Chat completion stream got an error: [address=127.0.0.1:41927, pid=136077] 'list' object has no attribute 'lstrip' Traceback (most recent call last): File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/api/restful_api.py", line 1444, in stream_results iterator = await model.chat( File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xoscar/backends/context.py", line 227, in send return self._process_result_message(result) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xoscar/backends/context.py", line 102, in _process_result_message raise message.as_instanceof_cause() File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xoscar/backends/pool.py", line 659, in send result = await self._run_coro(message.message_id, coro) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xoscar/backends/pool.py", line 370, in _run_coro return await coro File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xoscar/api.py", line 384, in __on_receive__ return await super().__on_receive__(message) # type: ignore File "xoscar/core.pyx", line 558, in __on_receive__ raise ex File "xoscar/core.pyx", line 520, in xoscar.core._BaseActor.__on_receive__ async with self._lock: File "xoscar/core.pyx", line 521, in xoscar.core._BaseActor.__on_receive__ with debug_async_timeout('actor_lock_timeout', File "xoscar/core.pyx", line 526, in xoscar.core._BaseActor.__on_receive__ result = await result File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/core/utils.py", line 45, in wrapped ret = await func(*args, **kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/core/model.py", line 87, in wrapped_func ret = await fn(self, *args, **kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xoscar/api.py", line 462, in _wrapper r = await func(self, *args, **kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/core/model.py", line 473, in chat response = await self._call_wrapper( File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/core/model.py", line 111, in _async_wrapper return await fn(*args, **kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/core/model.py", line 369, in _call_wrapper ret = await asyncio.to_thread(fn, *args, **kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/asyncio/threads.py", line 25, in to_thread return await loop.run_in_executor(None, func_call) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/model/llm/pytorch/core.py", line 512, in chat full_prompt = self._get_full_prompt(prompt, system_prompt, chat_history, tools) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/model/llm/pytorch/core.py", line 551, in _get_full_prompt full_prompt = ChatModelMixin.get_prompt( File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/xinference/model/llm/utils.py", line 292, in get_prompt content = content.lstrip("\n").rstrip() AttributeError: [address=127.0.0.1:41927, pid=136077] 'list' object has no attribute 'lstrip'

Maplemx commented 2 weeks ago

def update_chat_history_executor(inputs, storage): chat_history = storage.get("chat_history") or [] chat_history.append({ "role": "user", "content": inputs["user_input"] }) chat_history.append({ "role": "assistant", "content": inputs["assistant_reply"] }) storage.set("chat_history", chat_history)

这一段打印一下inputs的信息,有可能因为3.3.0.0版本的inputs数据传递做了修改,导致inputs未提供正确的user_input和assistant_reply信息

我们在3.3.1.0版本会修正这个信息,请更新到3.3.1.0版本试试

zifeiyu-tan commented 2 weeks ago

客服: {'意图': '日常闲聊', '问题': '爸爸的爸爸叫什么?', '关键词': ['爸爸', '爸爸的爸爸']} ----------Input: {'user_input': '爸爸的爸爸叫什么?', 'assistant_reply': {'意图': '日常闲聊', '问题': '爸爸的爸爸叫什么?', '关键词': ['爸爸', '爸爸的爸爸']}} User: 妈妈 的妈妈叫什么》 Exception in thread Thread-2 (start_in_theard): Traceback (most recent call last): File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 240, in start_in_theard reply = loop.run_until_complete(self.start_async(request_type)) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 227, in start_async raise(e) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 202, in start_async async for response in event_generator: File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/plugins/request/OpenAI.py", line 223, in broadcast_response_with_streaming async for part in response_generator: File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/openai/_streaming.py", line 147, in aiter async for item in self._iterator: File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/openai/_streaming.py", line 174, in stream raise APIError( openai.APIError: An error occurred during streaming

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/home/user/anaconda3/envs/tan_agently/lib/python3.10/site-packages/Agently/Agent/Agent.py", line 243, in start_in_theard raise Exception(f"[Agent Request] Error: { str(e) }") Exception: [Agent Request] Error: An error occurred during streaming

----------Input: {'user_input': '妈妈的妈妈叫什么》', 'assistant_reply': None} User:

其中----------Input:下方的字典是我打印的inputs信息。 另外agently已更新到最新

Maplemx commented 2 weeks ago

问题应该在这里:

def update_chat_history_executor(inputs, storage):
    chat_history = storage.get("chat_history") or []
    chat_history.append({ "role": "user", "content": inputs["user_input"] })
    # chat_history.append({ "role": "assistant", "content": inputs["assistant_reply"] })
    # 修正方式:
    chat_history.append({ "role": "assistant", "content": str(inputs["assistant_reply"]) })
    storage.set("chat_history", chat_history)
    print(chat_history) #<-通过打印出这个位置的信息,我们会发现,content的内容是一个字典,但标准的content应该是一个字符串
    return

⚠️ 但也需要注意,我们不推荐直接将结构化的数据结果放入content字段,随着对话记录的增加,会让模型在输出时出现注意力失焦或是对结构性输出的关注度大于内容本身。

Maplemx commented 2 weeks ago

附上我的完整测试代码:

import Agently
import logging

agent_factory = (
Agently.AgentFactory()
  .set_settings("current_model", "OAIClient")
  .set_settings("model.OAIClient.auth.api_key", "")
  .set_settings("model.OAIClient.url", "https://api.deepseek.com/")
  .set_settings("model.OAIClient.options.model", "deepseek-chat")
)

workflow = Agently.Workflow()

workflow.settings.set("logger", logging.getLogger("Workflow").setLevel(logging.WARNING))
agent = agent_factory.create_agent()

start_chunk = workflow.schema.create_chunk(
    title = "Multi-Round Chat",
    type = "Start"
)

user_input_chunk = workflow.schema.create_chunk(
    title = "User Input",
    handles = {
    "outputs": [{ "handle": "user_input" }],
    },
    executor = lambda inputs, storage: {
    "user_input": input("User: ")#<- You can change it to an API/WS request
    },# You can use a lambda function as an executor
)

def assistant_reply_executor(inputs, storage):
    chat_history = storage.get("chat_history") or []
    reply = (
        agent
            .set_role("姓名","智能客服")
            .set_role("角色", "你是XX有限公司的一名客服")
            .set_role(
                "行动规则",
                "首先需要根据{意图判定规则}对用户输入进行意图判定,"+
                "根据上下文,将客户问题总结成简单问句,如果问句主语不全,根据上下文补全问句主语"
            )
            .set_role(
                "意图判定规则",
                "从['日常闲聊', '公司部门介绍', '公司基本信息', '公司财经信息']中选择你判定的用户意图"
            )
            .user_info("和你对话的用户是青鸟消防有限公司的潜在客户")
            .chat_history(chat_history)
            .input(inputs["user_input"])
            .output({
                "意图":("str","根据意图判定规则和{input}判定用户意图"),
                "问题":("str","根据上下文,将{input}总结成简单问句,如果问句主语不全,根据上下文补全问句主语"),
                "对象":("str","提取{input}的主体")
            })
            .start()
    )
    print("[客服]: ", reply)
    return { "assistant_reply": reply }

assistant_reply_chunk = workflow.schema.create_chunk(
    title = "Assistant Reply",
    handles = {
        "inputs": [{ "handle": "user_input" }],
        "outputs": [{ "handle": "assistant_reply" }],
    },
    executor = assistant_reply_executor,
)

def update_chat_history_executor(inputs, storage):
    chat_history = storage.get("chat_history") or []
    chat_history.append({ "role": "user", "content": inputs["user_input"] })
    # chat_history.append({ "role": "assistant", "content": inputs["assistant_reply"] })
    # 修正方式:
    chat_history.append({ "role": "assistant", "content": str(inputs["assistant_reply"]) })
    storage.set("chat_history", chat_history)
    print(chat_history) #<-通过打印出这个位置的信息,我们会发现,content的内容是一个字典,但标准的content应该是一个字符串
    return

update_chat_history_chunk = workflow.schema.create_chunk(
    title = "Update Chat History",
    handles = {
        "inputs": [
            { "handle": "user_input" },
            { "handle": "assistant_reply" },
        ],
    },
    executor = update_chat_history_executor,
)

start_chunk.connect_to(user_input_chunk)
user_input_chunk.handle("user_input").connect_to(assistant_reply_chunk.handle("user_input"))
user_input_chunk.handle("user_input").connect_to(update_chat_history_chunk.handle("user_input"))
assistant_reply_chunk.handle("assistant_reply").connect_to(update_chat_history_chunk.handle("assistant_reply"))

update_chat_history_chunk.connect_to(user_input_chunk)
workflow.start()
zifeiyu-tan commented 2 weeks ago

已解决,感谢

Maplemx commented 2 weeks ago

在https://github.com/Maplemx/Agently/commit/492ff7adeba9a16f01e63bf462e6fe797d6b4332 对非string的content内容进行了进一步适配,防止出现类似错误

Maplemx commented 2 weeks ago

published in version 3.3.1.1