langgenius / dify

Dify is an open-source LLM app development platform. Dify's intuitive interface combines AI workflow, RAG pipeline, agent capabilities, model management, observability features and more, letting you quickly go from prototype to production.
https://dify.ai
Other
36.52k stars 4.95k forks source link

Add a custom built-in tool to the workflow and run it, but the workflow gets stuck in the "running" mode and does not produce any results. #4690

Open D-siheng opened 1 month ago

D-siheng commented 1 month ago

Self Checks

Dify version

0.6.7

Cloud or Self Hosted

Self Hosted (Docker)

Steps to reproduce

I wrote an built-in tool following the official guidelines and referencing the code of the Wiki search tool. The tool's function is to output document summaries using the document summary module of langchain. The tool can be displayed normally in the list of built-in tools, and after testing, it can run normally in workflow and output document summaries. I created a START-document summary-END workflow and published it. I ran the workflow in the explore interface, and all three nodes in the workflow displayed as completed, and the summary tool output the document summary content normally in the nodes. The end node also had normal input and output, but the workflow process was stuck in the running state and could not become the succeed state, which resulted in the summary result not being output on the screen. After testing, the situation where the workflow process could not end only occurred when a docx document was to be summarized. Besides the workflow, I also tried the chatflow, which consisted of STRAT-document summary-ANSWER three nodes. Running the chatflow could normally end and output formatted document summary content. My code is as follows:

# summarizer.yaml
identity:
  author: DLH
  name: summarizer
  label:
    en_US: Summarizer
    zh_Hans: 文档摘要
    pt_BR: Summarizer
  description:
    en_US: This tool is a documentation abstract of load_summarize_chain implementation using langchain
    zh_Hans: 该工具是使用langchain的load_summarize_chain实现的文档摘要
    pt_BR: This tool is a documentation abstract of load_summarize_chain implementation using langchain
  icon: icon.svg
credentials_for_provider:
# summarizer.py
from core.tools.errors import ToolProviderCredentialValidationError
from core.tools.provider.builtin_tool_provider import BuiltinToolProviderController

from core.tools.provider.builtin.summarizer.tools.langchain_summarize import SummarizeTool

class SummarizeProvider(BuiltinToolProviderController):
    def _validate_credentials(self, credentials: dict) -> None:
        try:
            SummarizeTool().fork_tool_runtime(
                meta={
                    "credentials": credentials,
                }
            ).invoke(
                user_id='',
                tool_parameters={
                    "File_Path": "file path",
                    # 1、添加按ChunkSize大小对Docx文档进行分段功能
                    "ChunkSize": 0
                },
            )
        except Exception as e:
            raise ToolProviderCredentialValidationError(str(e))
# langchain_summarize.yaml
identity:
  author: DLH
  name: summarizer
  label:
    en_US: Summarizer
    zh_Hans: 文档摘要
    pt_BR: Summarizer
  description:
    en_US: This tool is a documentation abstract of load_summarize_chain implementation using langchain
    zh_Hans: 该工具是使用langchain的load_summarize_chain实现的文档摘要
    pt_BR: This tool is a documentation abstract of load_summarize_chain implementation using langchain
  icon: icon.svg
credentials_for_provider:
# langchain_summarize.py
import os
import glob
import hashlib
import tiktoken
from langchain_community.llms import VLLMOpenAI
from langchain.prompts import PromptTemplate
# 1、添加按ChunkSize大小对Docx文档进行分段功能
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain
from langchain.docstore.document import Document
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import Docx2txtLoader
import logging
import traceback

from typing import Any, Optional, Union
from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.tool.builtin_tool import BuiltinTool

class SummarizeTool(BuiltinTool):
    # 文档摘要功能函数
        def summarize_from_folder(self, folder, llm, ChunkSize):
            # 变量定义
            summaries = []

            logging.basicConfig(level=logging.DEBUG)

            logging.info('摘要函数 +——+ 开始执行')

            # 加载大模型
            QWEN = VLLMOpenAI(
                max_tokens=10000, # 大模型的总tokens长度
                temperature=0.7, # 温度,越高生成的内容复杂度就越高
                openai_api_key="EMPTY", # openAI模型的APIKEY,因用的本地模型,因此设为EMPTY,如果使用联网模型则输入对应key值即可
                openai_api_base="http://xx.xx.x.xx:8080/v1", # 使用vllm推理引擎加载模型,这里填入引擎对外提供的API接口,因隐私安全问题使用xx代替
                model_name="/data/models/Qwen1.5-72B-Chat/" # 同上,填入对应模型的名称
            )

            # 文档读取
            for file in glob.glob(folder + "/*"):
                # 获取文件名
                file_name = os.path.basename(file)
                # pdf文档
                if file.endswith(".pdf"):
                    loader = PyPDFLoader(file)
                    # 返回一个文档数组存入docs中,每个元素为一个页面内容
                    docs = loader.load_and_split()
                # docx文档
                elif file.endswith(".docx"):
                    docx_loader = Docx2txtLoader(file)
                    # 1、添加按ChunkSize大小对Docx文档进行分段功能
                    if ChunkSize != 0: # 设置ChunkSize的值时,按ChunkSize大小对docx进行分段
                        text_splitter = RecursiveCharacterTextSplitter(chunk_size=ChunkSize, chunk_overlap=50)
                        data = docx_loader.load_and_split(text_splitter)
                    else: # 否则就按'/n/n'对docx进行分段
                        data = docx_loader.load()
                        text_splitter = CharacterTextSplitter(chunk_size=512, chunk_overlap=50)
                    docs = text_splitter.split_documents(data)
                else:
                    return self.create_text_message("Unsupported file type. Only support docx & pdf.")
                    # print(f"Unsupported file type: {file}.Only support docx & pdf.")
                    # continue
                # 文档摘要生成
                chain = load_summarize_chain(llm, chain_type="map_reduce", return_intermediate_steps=False)
                summary = chain.run(docs)
                # 创建一个包含文件名和摘要的字符串
                file_summary_str = f"Summary of {file_name}:\n\n{summary}"
                # print(file_summary_str)
                # print("\n")
                summaries.append(file_summary_str)

                logging.info('摘要函数 +——+ 结束')
            return "\n\n".join(summaries)

        def _invoke(self, 
                    user_id: str,  # 用户ID
                tool_parameters: dict[str, Any],  # 工具参数
            ) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]:
            """
                invoke tools
            """

            blobpath = "https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken"
            cache_key = hashlib.sha1(blobpath.encode()).hexdigest()
            # 配置tiktoken的本地缓存目录
            tiktoken_cache_dir = "/app/api"
            os.environ["TIKTOKEN_CACHE_DIR"] = tiktoken_cache_dir
            assert os.path.exists(os.path.join(tiktoken_cache_dir, cache_key))

            folder = tool_parameters.get('File_Path', '')
            # 1、添加按ChunkSize大小对Docx文档进行分段功能
            chunksize = tool_parameters.get('ChunkSize', 0)
            if not folder:
                return self.create_text_message('Please input folder')
            logging.info('invoke函数 执行')
            result = self.summarize_from_folder(folder, QWEN, chunksize)

            # result = "\n\n".join(results)

            with open(filename, 'w') as f:
                traceback.print_stack(file=f)

            logging.info('invoke函数 结束')

            return self.create_text_message(self.summary(user_id=user_id,content=result))

✔️ Expected Behavior

The workflow should be able to complete normally and output JSON-formatted results.

❌ Actual Behavior

As long as all three nodes have ended and their status is checked, the workflow remains in the running state and does not output the final result.

dosubot[bot] commented 1 month ago

The workflow gets stuck in the "running" state and does not produce any results when summarizing a docx document because the _run method in the ToolNode class does not handle the case where the tool invocation does not return any messages or the messages are not properly converted. Specifically, the _convert_tool_messages method might not be handling the messages correctly, leading to an incomplete or empty result.

Here is the relevant code snippet that shows the potential issue:

def _run(self, variable_pool: VariablePool) -> NodeRunResult:
    # ... (other code)

    try:
        messages = ToolEngine.workflow_invoke(
            tool=tool_runtime,
            tool_parameters=parameters,
            user_id=self.user_id,
            workflow_id=self.workflow_id, 
            workflow_tool_callback=DifyWorkflowCallbackHandler()
        )
    except Exception as e:
        return NodeRunResult(
            status=WorkflowNodeExecutionStatus.FAILED,
            inputs=parameters,
            metadata={
                NodeRunMetadataKey.TOOL_INFO: tool_info
            },
            error=f'Failed to invoke tool: {str(e)}',
        )

    # convert tool messages
    plain_text, files = self._convert_tool_messages(messages)

    return NodeRunResult(
        status=WorkflowNodeExecutionStatus.SUCCEEDED,
        outputs={
            'text': plain_text,
            'files': files
        },
        metadata={
            NodeRunMetadataKey.TOOL_INFO: tool_info
        },
        inputs=parameters
    )

The _convert_tool_messages method is responsible for converting the tool messages into plain text and files. If this method does not handle the messages correctly, it could result in an empty or incomplete result, causing the workflow to get stuck.

To fix this issue, you should ensure that the _convert_tool_messages method correctly handles all possible message types and scenarios. According to the Dify documentation, Dify supports various message types such as text, link, image, and file BLOB. Make sure that your _convert_tool_messages method can handle these types appropriately. Here are some helper methods provided by Dify for creating different types of messages:

def create_image_message(self, image: str, save_as: str = '') -> ToolInvokeMessage:
    # create an image message
    pass

def create_link_message(self, link: str, save_as: str = '') -> ToolInvokeMessage:
    # create a link message
    pass

def create_text_message(self, text: str, save_as: str = '') -> ToolInvokeMessage:
    # create a text message
    pass

def create_blob_message(self, blob: bytes, meta: dict = None, save_as: str = '') -> ToolInvokeMessage:
    # create a blob message
    pass

Additionally, you may want to add logging or debugging statements to trace the flow of data and identify where the process might be failing. This will help you pinpoint the exact issue and ensure that all message types are being handled correctly.

Furthermore, ensure that you have properly authorized the tool and configured its inputs and parameters as per the Dify documentation. Configuring a tool node generally involves two steps:

  1. Authorizing the Tool/Creating Custom Tools
  2. Configuring Tool Inputs and Parameters

For detailed guidance on creating custom tools and configuring them, please refer to the tool configuration instructions.

Additionally, the useWorkflowRun hook in Dify's codebase provides a comprehensive mechanism for handling workflow runs, including starting, stopping, and managing the state of the workflow. This hook ensures that the workflow's state is properly managed and updated, which can help in diagnosing and resolving issues related to workflows getting stuck. Here is a relevant excerpt from the useWorkflowRun hook:

const handleRun = useCallback(async (
    params: any,
    callback?: IOtherOptions,
  ) => {
    // ... (other code)

    ssePost(
      url,
      {
        body: params,
      },
      {
        onWorkflowStarted: (params) => {
          // ... (handle workflow started)
        },
        onWorkflowFinished: (params) => {
          // ... (handle workflow finished)
        },
        onError: (params) => {
          // ... (handle error)
        },
        onNodeStarted: (params) => {
          // ... (handle node started)
        },
        onNodeFinished: (params) => {
          // ... (handle node finished)
        },
        onTextChunk: (params) => {
          // ... (handle text chunk)
        },
        onTextReplace: (params) => {
          // ... (handle text replace)
        },
        ...restCallback,
      },
    )
  }, [store, reactflow, workflowStore, doSyncWorkflowDraft])

This hook ensures that the workflow's state transitions are properly managed and provides callbacks for various stages of the workflow execution, which can be useful for debugging and ensuring that the workflow does not get stuck [1][2][3][4].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.