pipecat-ai / pipecat

Open Source framework for voice and multimodal conversational AI
BSD 2-Clause "Simplified" License
3.04k stars 243 forks source link

dify connection #377

Open chengleilovesky opened 4 weeks ago

chengleilovesky commented 4 weeks ago

I implemented dify as LLM. I implemented dify as LLM.

class DifyHttpService(LLMService): """DifyHttpService 使用 aiohttp 来发送异步 HTTP 请求,并解析响应数据。

该服务消耗 OpenAILLMContextFrame 帧,包含一个引用 OpenAILLMContext 帧的对象。
OpenAILLMContext 对象定义了发送到 LLM 进行完成的上下文。
这包括用户、助手和系统消息,以及工具选择和使用的工具(如果请求函数调用来自 LLM)。
"""

def __init__(self, api_key: str, base_url: str = 'http://localhost/v1', **kwargs):
    super().__init__(**kwargs)
    self.api_key = api_key
    self.base_url = base_url

async def send_chat_request(self, query: str, response_mode: str = "streaming", conversation_id: str = "",
                            user: str = "abc-123"):
    url = f'{self.base_url}/chat-messages'
    headers = {
        'Authorization': f'Bearer {self.api_key}',
        'Content-Type': 'application/json',
    }
    data = {
        "inputs": {},
        "query": query,
        "response_mode": response_mode,
        "conversation_id": conversation_id,
        "user": user,
    }

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url, headers=headers, json=data) as response:
                response.raise_for_status()
                async for line in response.content:
                    decoded_line = line.decode('utf-8').strip()
                    if decoded_line.startswith("data: "):
                        json_data = decoded_line[6:]
                        parsed_data = self.parse_message_data(json_data)
                        if parsed_data.get("answer") not in [None, ""]:
                            await self.push_frame(TextFrame(parsed_data.get("answer")))

        except aiohttp.ClientResponseError as e:
            print(f"HTTP error occurred: {e.status} - {e.message}")
        except Exception as e:
            print(f"An error occurred: {e}")
            return None

async def _process_context(self, context: OpenAILLMContext):
    await self.push_frame(LLMFullResponseStartFrame())
    logger.debug(f"Generating chat: {context.get_messages_json()}")
    messages = self._get_messages_from_str(context)
    await self.start_ttfb_metrics()
    await self.send_chat_request(messages)
    await self.stop_ttfb_metrics()
    await self.push_frame(LLMFullResponseEndFrame())

async def process_frame(self, frame: Frame, direction: FrameDirection):
    await super().process_frame(frame, direction)

    context = None

    if isinstance(frame, OpenAILLMContextFrame):
        context: OpenAILLMContext = frame.context
    elif isinstance(frame, LLMMessagesFrame):
        context = OpenAILLMContext.from_messages(frame.messages)
    elif isinstance(frame, VisionImageRawFrame):
        context = OpenAILLMContext.from_image_frame(frame)
    elif isinstance(frame, LLMModelUpdateFrame):
        logger.debug(f"Switching LLM model to: [{frame.model}]")
        self._create_client(frame.model)
    else:
        await self.push_frame(frame, direction)

    if context:
        await self._process_context(context)

def parse_message_data(self, json_data: str) -> dict:
    """
    解析从服务器返回的 JSON 数据,提取有用的信息。

    :param json_data: JSON 格式的字符串数据
    :return: 包含提取数据的字典
    """
    try:
        # 将 JSON 字符串解析为 Python 字典
        data = json.loads(json_data)

        # 提取有用的字段
        parsed_data = {
            "event": data.get("event"),
            "conversation_id": data.get("conversation_id"),
            "message_id": data.get("message_id"),
            "created_at": data.get("created_at"),
            "task_id": data.get("task_id"),
            "id": data.get("id"),
            "answer": data.get("answer"),
        }

        return parsed_data

    except json.JSONDecodeError as e:
        print(f"Failed to decode JSON: {e}")
        return {}

def _get_messages_from_str(
        self, context: OpenAILLMContext) -> str:
    openai_messages = context.get_messages()
    google_messages = []
    logger.debug(openai_messages)
    for message in openai_messages:
        content = message["content"]
        google_messages.append(content)
    return google_messages[-1]
chengleilovesky commented 4 weeks ago

In the websocket server case, the effect of using the user speaking interruption function is not particularly good. I found that the audio transmission is too fast, which makes it impossible to interrupt. Will it cause two segments of speech to be output simultaneously? Interrupting in Daily is very effective

ramishi commented 3 weeks ago

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

aconchillo commented 3 weeks ago

There's this PR https://github.com/pipecat-ai/pipecat/pull/378 from @ramishi

chengleilovesky commented 3 weeks ago

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

How can I give you an example?

ramishi commented 3 weeks ago

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

How can I give you an example?

you can post it here please @chengleilovesky or we can PM if it's better for you.