langgenius / dify

Dify is an open-source LLM app development platform. Dify's intuitive interface combines AI workflow, RAG pipeline, agent capabilities, model management, observability features and more, letting you quickly go from prototype to production.
https://dify.ai
Other
45.73k stars 6.44k forks source link

json output property is empty when a workflow is used as tool in another workflow #6924

Closed akramer-zibra closed 1 month ago

akramer-zibra commented 1 month ago

Self Checks

Dify version

0.6.15

Cloud or Self Hosted

Self Hosted (Docker)

Steps to reproduce

(1) Create a workflow (A) which gets a simple string input parameter at start and provides this as result when this workflow ends.

Screenshot Workflow-A

(2) Create a second workflow (B) which gets a simple input parameter at start, passes this parameter as input into workflow A. This parent workflow provides text, files and json as final output, when it ends.

Screenshot Workflow-B

(3) Run the parent workflow (B) with the input Hello World!

✔️ Expected Behavior

The json property in the result of the parent workflow (B) contains the result of the integrated workflow (A):

{
    "text": "...",
    "files": [],
    "json": {
        "result": "Hello World!"
    }
}

❌ Actual Behavior

The json property in the result of the parent workflow (B) is an empty array. And there is also a wrong stringified version of the expected result in the text property.

{
  "text": "{\"result\": \"1722627351591.name\"}",
  "files": [],
  "json": []
}
dosubot[bot] commented 1 month ago

To resolve the issue where the json property in the result of the parent workflow (B) is empty, and the expected result is incorrectly placed in the text property when integrating workflow (A) into workflow (B), you need to ensure that the LLMNode is correctly configured to extract the JSON response.

From the provided code, it appears that the LLMNode is used to handle the response. The prompt_template and prompt_config should be set up to ensure the JSON response is correctly extracted and placed in the json property.

Here is an example of how you can ensure the JSON response is correctly extracted and placed in the json property:

import json
import os
from unittest.mock import MagicMock

import pytest

from core.app.entities.app_invoke_entities import InvokeFrom, ModelConfigWithCredentialsEntity
from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
from core.entities.provider_entities import CustomConfiguration, CustomProviderConfiguration, SystemConfiguration
from core.model_manager import ModelInstance
from core.model_runtime.entities.model_entities import ModelType
from core.model_runtime.model_providers import ModelProviderFactory
from core.workflow.entities.node_entities import SystemVariable
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.nodes.base_node import UserFrom
from core.workflow.nodes.llm.llm_node import LLMNode
from extensions.ext_database import db
from models.provider import ProviderType
from models.workflow import WorkflowNodeExecutionStatus

@pytest.mark.parametrize('setup_openai_mock', [['chat']], indirect=True)
def test_execute_llm(setup_openai_mock):
    node = LLMNode(
        tenant_id='1',
        app_id='1',
        workflow_id='1',
        user_id='1',
        invoke_from=InvokeFrom.WEB_APP,
        user_from=UserFrom.ACCOUNT,
        config={
            'id': 'llm',
            'data': {
                'title': '123',
                'type': 'llm',
                'model': {
                    'provider': 'openai',
                    'name': 'gpt-3.5-turbo',
                    'mode': 'chat',
                    'completion_params': {}
                },
                'prompt_template': [
                    {
                        'role': 'system',
                        'text': 'you are a helpful assistant.\ntoday\'s weather is {{#abc.output#}}.'
                    },
                    {
                        'role': 'user',
                        'text': '{{#sys.query#}}'
                    }
                ],
                'memory': None,
                'context': {
                    'enabled': False
                },
                'vision': {
                    'enabled': False
                }
            }
        }
    )

    # construct variable pool
    pool = VariablePool(system_variables={
        SystemVariable.QUERY: 'what\'s the weather today?',
        SystemVariable.FILES: [],
        SystemVariable.CONVERSATION_ID: 'abababa',
        SystemVariable.USER_ID: 'aaa'
    }, user_inputs={})
    pool.append_variable(node_id='abc', variable_key_list=['output'], value='sunny')

    credentials = {
        'openai_api_key': os.environ.get('OPENAI_API_KEY')
    }
    provider_instance = ModelProviderFactory().get_provider_instance('openai')
    model_type_instance = provider_instance.get_model_instance(ModelType.LLM)
    provider_model_bundle = ProviderModelBundle(
        configuration=ProviderConfiguration(
            tenant_id='1',
            provider=provider_instance.get_provider_schema(),
            preferred_provider_type=ProviderType.CUSTOM,
            using_provider_type=ProviderType.CUSTOM,
            system_configuration=SystemConfiguration(
                enabled=False
            ),
            custom_configuration=CustomConfiguration(
                provider=CustomProviderConfiguration(
                    credentials=credentials
                )
            ),
            model_settings=[]
        ),
        provider_instance=provider_instance,
        model_type_instance=model_type_instance
    )
    model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model='gpt-3.5-turbo')
    model_config = ModelConfigWithCredentialsEntity(
        model='gpt-3.5-turbo',
        provider='openai',
        mode='chat',
        credentials=credentials,
        parameters={},
        model_schema=model_type_instance.get_model_schema('gpt-3.5-turbo'),
        provider_model_bundle=provider_model_bundle
    )

    # Mock db.session.close()
    db.session.close = MagicMock()

    node._fetch_model_config = MagicMock(return_value=(model_instance, model_config))

    # execute node
    result = node.run(pool)

    assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
    assert result.outputs['text'] is not None
    assert result.outputs['usage']['total_tokens'] > 0

@pytest.mark.parametrize('setup_code_executor_mock', [['none']], indirect=True)
@pytest.mark.parametrize('setup_openai_mock', [['chat']], indirect=True)
def test_execute_llm_with_jinja2(setup_code_executor_mock, setup_openai_mock):
    """
    Test execute LLM node with jinja2
    """
    node = LLMNode(
        tenant_id='1',
        app_id='1',
        workflow_id='1',
        user_id='1',
        invoke_from=InvokeFrom.WEB_APP,
        user_from=UserFrom.ACCOUNT,
        config={
            'id': 'llm',
            'data': {
                'title': '123',
                'type': 'llm',
                'model': {
                    'provider': 'openai',
                    'name': 'gpt-3.5-turbo',
                    'mode': 'chat',
                    'completion_params': {}
                },
                'prompt_config': {
                    'jinja2_variables': [{
                        'variable': 'sys_query',
                        'value_selector': ['sys', 'query']
                    }, {
                        'variable': 'output',
                        'value_selector': ['abc', 'output']
                    }]
                },
                'prompt_template': [
                    {
                        'role': 'system',
                        'text': 'you are a helpful assistant.\ntoday\'s weather is {{#abc.output#}}',
                        'jinja2_text': 'you are a helpful assistant.\ntoday\'s weather is {{output}}.',
                        'edition_type': 'jinja2'
                    },
                    {
                        'role': 'user',
                        'text': '{{#sys.query#}}',
                        'jinja2_text': '{{sys_query}}',
                        'edition_type': 'basic'
                    }
                ],
                'memory': None,
                'context': {
                    'enabled': False
                },
                'vision': {
                    'enabled': False
                }
            }
        }
    )
    # construct variable pool
    pool = VariablePool(system_variables={
        SystemVariable.QUERY: 'what\'s the weather today?',
        SystemVariable.FILES: [],
        SystemVariable.CONVERSATION_ID: 'abababa',
        SystemVariable.USER_ID: 'aaa'
    }, user_inputs={})
    pool.append_variable(node_id='abc', variable_key_list=['output'], value='sunny')

    credentials = {
        'openai_api_key': os.environ.get('OPENAI_API_KEY')
    }

    provider_instance = ModelProviderFactory().get_provider_instance('openai')
    model_type_instance = provider_instance.get_model_instance(ModelType.LLM)
    provider_model_bundle = ProviderModelBundle(
        configuration=ProviderConfiguration(
            tenant_id='1',
            provider=provider_instance.get_provider_schema(),
            preferred_provider_type=ProviderType.CUSTOM,
            using_provider_type=ProviderType.CUSTOM,
            system_configuration=SystemConfiguration(
                enabled=False
            ),
            custom_configuration=CustomConfiguration(
                provider=CustomProviderConfiguration(
                    credentials=credentials
                )
            ),
            model_settings=[]
        ),
        provider_instance=provider_instance,
        model_type_instance=model_type_instance,
    )

    model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model='gpt-3.5-turbo')

    model_config = ModelConfigWithCredentialsEntity(
        model='gpt-3.5-turbo',
        provider='openai',
        mode='chat',
        credentials=credentials,
        parameters={},
        model_schema=model_type_instance.get_model_schema('gpt-3.5-turbo'),
        provider_model_bundle=provider_model_bundle
    )

    # Mock db.session.close()
    db.session.close = MagicMock()

    node._fetch_model_config = MagicMock(return_value=(model_instance, model_config))

    # execute node
    result = node.run(pool)
    assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
    assert 'sunny' in json.dumps(result.process_data)
    assert 'what\'s the weather today?' in json.dumps(result.process_data)

Ensure that the LLMNode is configured to use the correct prompt templates and Jinja2 variables to extract the JSON response. The prompt_template should be correctly formatted to guide the model to produce a JSON response. If the JSON response is still placed in the text property, you may need to adjust the extraction logic to parse the text property and extract the JSON content manually.

Additionally, you can use the following prompt templates from Dify to ensure the JSON response is correctly formatted and extracted:

### Extract parameter Workflow
I need to extract the following information from the input text. The <information to be extracted> tag specifies the 'type', 'description' and 'required' of the information to be extracted. 
<information to be extracted>
{{ structure }}
</information to be extracted>

Step 1: Carefully read the input and understand the structure of the expected output.
Step 2: Extract relevant parameters from the provided text based on the name and description of object. 
Step 3: Structure the extracted parameters to JSON object as specified in <structure>.
Step 4: Ensure that the JSON object is properly formatted and valid. The output should not contain any XML tags. Only the JSON object should be outputted.

### Memory
Here is the chat histories between human and assistant, inside <histories></histories> XML tags.
<histories>
{histories}
</histories>

### Structure
Here is the structure of the expected output, I should always follow the output structure. 
{{γγγ
  'properties1': 'relevant text extracted from input', 
  'properties2': 'relevant text extracted from input', 
}}γγγ

### Input Text
Inside <text></text> XML tags, there is a text that I should extract parameters and convert to a JSON object.
<text>
{text}
</text>

### Answer
I should always output a valid JSON object. Output nothing other than the JSON object. 
```JSON


By using these prompt templates, you can ensure that the JSON response is correctly formatted and extracted, and placed in the `json` property of the result <sup>[[1]](https://github.com/langgenius/dify/blob/main/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py)</sup><sup>[[2]](https://github.com/langgenius/dify/blob/main/api/tests/integration_tests/workflow/nodes/test_llm.py)</sup><sup>[[3]](https://github.com/langgenius/dify/blob/main/api/core/workflow/nodes/parameter_extractor/prompts.py)</sup>.

<!-- Dosu Comment Footer -->
*To continue talking to [Dosu](https://dosu.dev), mention @dosu.*
crazywoola commented 1 month ago

In fact, the content returned by another workflow will be returned in 'text', although there is a field called 'json', but it is only used in some cases

akramer-zibra commented 1 month ago

Oh, I see. This behaviour may confuse some users, because it breaks with common api expectations. Providing json-data in a stringified variant inside a json-object leads to extra data transformation afterwards. Receiving the output-data of such embedded workflow, would simplify the data access and also decrease error probability (due to the extra data transformation).

So I'd like to propose the following change: Let's provide the output-data of an embedded workflow also in the json property of this "tool".

If the maintainers are okay with this, I am open to provide a PR for this @crazywoola .

akramer-zibra commented 1 month ago

@dosu please show me the code, which runs a workflow as a tool within the api service.

dosubot[bot] commented 1 month ago

To run a workflow as a tool within the API service, you can refer to the WorkflowService class in the api/services/workflow_service.py file. This class provides methods to handle the execution of workflow nodes, including running a workflow as a tool.

Here is a relevant code snippet from workflow_service.py that demonstrates how a workflow node execution is handled:

import json
import time
from datetime import datetime, timezone
from typing import Optional

from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.workflow import (
    CreatedByRole,
    Workflow,
    WorkflowNodeExecution,
    WorkflowNodeExecutionStatus,
    WorkflowNodeExecutionTriggeredFrom,
)

class WorkflowService:
    # Other methods...

    def run_draft_workflow_node(self, app_model: App,
                                node_id: str,
                                user_inputs: dict,
                                account: Account) -> WorkflowNodeExecution:
        """
        Run draft workflow node
        """
        # fetch draft workflow by app_model
        draft_workflow = self.get_draft_workflow(app_model=app_model)
        if not draft_workflow:
            raise ValueError('Workflow not initialized')

        # run draft workflow node
        workflow_engine_manager = WorkflowEngineManager()
        start_at = time.perf_counter()

        try:
            node_instance, node_run_result = workflow_engine_manager.single_step_run_workflow_node(
                workflow=draft_workflow,
                node_id=node_id,
                user_inputs=user_inputs,
                user_id=account.id,
            )
        except WorkflowNodeRunFailedError as e:
            workflow_node_execution = WorkflowNodeExecution(
                tenant_id=app_model.tenant_id,
                app_id=app_model.id,
                workflow_id=draft_workflow.id,
                triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
                index=1,
                node_id=e.node_id,
                node_type=e.node_type.value,
                title=e.node_title,
                status=WorkflowNodeExecutionStatus.FAILED.value,
                error=e.error,
                elapsed_time=time.perf_counter() - start_at,
                created_by_role=CreatedByRole.ACCOUNT.value,
                created_by=account.id,
                created_at=datetime.now(timezone.utc).replace(tzinfo=None),
                finished_at=datetime.now(timezone.utc).replace(tzinfo=None)
            )
            db.session.add(workflow_node_execution)
            db.session.commit()

            return workflow_node_execution

        if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
            # create workflow node execution
            workflow_node_execution = WorkflowNodeExecution(
                tenant_id=app_model.tenant_id,
                app_id=app_model.id,
                workflow_id=draft_workflow.id,
                triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
                index=1,
                node_id=node_id,
                node_type=node_instance.node_type.value,
                title=node_instance.node_data.title,
                inputs=json.dumps(node_run_result.inputs) if node_run_result.inputs else None,
                process_data=json.dumps(node_run_result.process_data) if node_run_result.process_data else None,
                outputs=json.dumps(jsonable_encoder(node_run_result.outputs)) if node_run_result.outputs else None,
                execution_metadata=(json.dumps(jsonable_encoder(node_run_result.metadata))
                                    if node_run_result.metadata else None),
                status=WorkflowNodeExecutionStatus.SUCCEEDED.value,
                elapsed_time=time.perf_counter() - start_at,
                created_by_role=CreatedByRole.ACCOUNT.value,
                created_by=account.id,
                created_at=datetime.now(timezone.utc).replace(tzinfo=None),
                finished_at=datetime.now(timezone.utc).replace(tzinfo=None)
            )
        else:
            # create workflow node execution
            workflow_node_execution = WorkflowNodeExecution(
                tenant_id=app_model.tenant_id,
                app_id=app_model.id,
                workflow_id=draft_workflow.id,
                triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
                index=1,
                node_id=node_id,
                node_type=node_instance.node_type.value,
                title=node_instance.node_data.title,
                status=node_run_result.status.value,
                error=node_run_result.error,
                elapsed_time=time.perf_counter() - start_at,
                created_by_role=CreatedByRole.ACCOUNT.value,
                created_by=account.id,
                created_at=datetime.now(timezone.utc).replace(tzinfo=None),
                finished_at=datetime.now(timezone.utc).replace(tzinfo=None)
            )

        db.session.add(workflow_node_execution)
        db.session.commit()

        return workflow_node_execution

This method, run_draft_workflow_node, demonstrates how a workflow node is executed, including handling inputs, processing data, and outputs. The outputs are converted to JSON format using json.dumps and jsonable_encoder.

To ensure that the output data of an embedded workflow is provided in the json property of the parent workflow's result, you can modify the onWorkflowFinished callback to include the output data in the json property. Here is the modified part of the code:

onWorkflowFinished: ({ data }) => {
  if (isTimeout)
    return
  if (data.error) {
    notify({ type: 'error', message: data.error })
    setWorkflowProccessData(produce(getWorkflowProccessData()!, (draft) => {
      draft.status = WorkflowRunningStatus.Failed
    }))
    setRespondingFalse()
    onCompleted(getCompletionRes(), taskId, false)
    isEnd = true
    return
  }
  setWorkflowProccessData(produce(getWorkflowProccessData()!, (draft) => {
    draft.status = WorkflowRunningStatus.Succeeded
  }))
  if (!data.outputs) {
    setCompletionRes('')
  }
  else {
    const outputData = { json: data.outputs };
    setCompletionRes(outputData)
    const isStringOutput = Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string'
    if (isStringOutput) {
      setWorkflowProccessData(produce(getWorkflowProccessData()!, (draft) => {
        draft.resultText = data.outputs[Object.keys(data.outputs)[0]]
      }))
    }
  }
  setRespondingFalse()
  setMessageId(tempMessageId)
  onCompleted(getCompletionRes(), taskId, true)
  isEnd = true
}

This modification ensures that the output data is wrapped in a json property before being set as the completion result [1][2][3].

To continue talking to Dosu, mention @dosu.

akramer-zibra commented 1 month ago

Okay. Thank you @dosu . I see the outputs variable in context of WorkflowNodeExecution is set on SUCCEEDED status with json.dumps function. Please show me the python code, where a workflow is executed as a "tool" like the other builtin tools, for example searxng, stable diffusion, jina etc.

dosubot[bot] commented 1 month ago

To execute a workflow as a tool within the API service, you can refer to the WorkflowTool class in the api/core/tools/tool/workflow_tool.py file. This class demonstrates how a workflow is invoked as a tool, similar to other built-in tools like searxng, stable diffusion, and jina.

Here is the relevant Python code that shows how a workflow is executed as a tool:

import json
import logging
from copy import deepcopy
from typing import Any, Union

from core.file.file_obj import FileTransferMethod, FileVar
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolParameter, ToolProviderType
from core.tools.tool.tool import Tool
from extensions.ext_database import db
from models.account import Account
from models.model import App, EndUser
from models.workflow import Workflow

logger = logging.getLogger(__name__)

class WorkflowTool(Tool):
    workflow_app_id: str
    version: str
    workflow_entities: dict[str, Any]
    workflow_call_depth: int

    label: str

    """
    Workflow tool.
    """
    def tool_provider_type(self) -> ToolProviderType:
        """
            get the tool provider type

            :return: the tool provider type
        """
        return ToolProviderType.WORKFLOW

    def _invoke(
        self, user_id: str, tool_parameters: dict[str, Any]
    ) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]:
        """
            invoke the tool
        """
        app = self._get_app(app_id=self.workflow_app_id)
        workflow = self._get_workflow(app_id=self.workflow_app_id, version=self.version)

        # transform the tool parameters
        tool_parameters, files = self._transform_args(tool_parameters)

        from core.app.apps.workflow.app_generator import WorkflowAppGenerator
        generator = WorkflowAppGenerator()
        result = generator.generate(
            app_model=app, 
            workflow=workflow, 
            user=self._get_user(user_id), 
            args={
                'inputs': tool_parameters,
                'files': files
            }, 
            invoke_from=self.runtime.invoke_from,
            stream=False,
            call_depth=self.workflow_call_depth + 1,
        )

        data = result.get('data', {})

        if data.get('error'):
            raise Exception(data.get('error'))

        result = []

        outputs = data.get('outputs', {})
        outputs, files = self._extract_files(outputs)
        for file in files:
            result.append(self.create_file_var_message(file))

        result.append(self.create_text_message(json.dumps(outputs, ensure_ascii=False)))

        return result

    def _get_user(self, user_id: str) -> Union[EndUser, Account]:
        """
            get the user by user id
        """
        user = db.session.query(EndUser).filter(EndUser.id == user_id).first()
        if not user:
            user = db.session.query(Account).filter(Account.id == user_id).first()

        if not user:
            raise ValueError('user not found')

        return user

    def fork_tool_runtime(self, runtime: dict[str, Any]) -> 'WorkflowTool':
        """
            fork a new tool with meta data

            :param meta: the meta data of a tool call processing, tenant_id is required
            :return: the new tool
        """
        return self.__class__(
            identity=deepcopy(self.identity),
            parameters=deepcopy(self.parameters),
            description=deepcopy(self.description),
            runtime=Tool.Runtime(**runtime),
            workflow_app_id=self.workflow_app_id,
            workflow_entities=self.workflow_entities,
            workflow_call_depth=self.workflow_call_depth,
            version=self.version,
            label=self.label
        )

    def _get_workflow(self, app_id: str, version: str) -> Workflow:
        """
            get the workflow by app id and version
        """
        if not version:
            workflow = db.session.query(Workflow).filter(
                Workflow.app_id == app_id, 
                Workflow.version != 'draft'
            ).order_by(Workflow.created_at.desc()).first()
        else:
            workflow = db.session.query(Workflow).filter(
                Workflow.app_id == app_id, 
                Workflow.version == version
            ).first()

        if not workflow:
            raise ValueError('workflow not found or not published')

        return workflow

    def _get_app(self, app_id: str) -> App:
        """
            get the app by app id
        """
        app = db.session.query(App).filter(App.id == app_id).first()
        if not app:
            raise ValueError('app not found')
        return app

    def _transform_args(self, tool_parameters: dict) -> tuple[dict, list[dict]]:
        """
            transform the tool parameters

            :param tool_parameters: the tool parameters
            :return: tool_parameters, files
        """
        parameter_rules = self.get_all_runtime_parameters()
        parameters_result = {}
        files = []
        for parameter in parameter_rules:
            if parameter.type == ToolParameter.ToolParameterType.FILE:
                file = tool_parameters.get(parameter.name)
                if file:
                    try:
                        file_var_list = [FileVar(**f) for f in file]
                        for file_var in file_var_list:
                            file_dict = {
                                'transfer_method': file_var.transfer_method.value,
                                'type': file_var.type.value,
                            }
                            if file_var.transfer_method == FileTransferMethod.TOOL_FILE:
                                file_dict['tool_file_id'] = file_var.related_id
                            elif file_var.transfer_method == FileTransferMethod.LOCAL_FILE:
                                file_dict['upload_file_id'] = file_var.related_id
                            elif file_var.transfer_method == FileTransferMethod.REMOTE_URL:
                                file_dict['url'] = file_var.preview_url

                            files.append(file_dict)
                    except Exception as e:
                        logger.exception(e)
            else:
                parameters_result[parameter.name] = tool_parameters.get(parameter.name)

        return parameters_result, files

    def _extract_files(self, outputs: dict) -> tuple[dict, list[FileVar]]:
        """
            extract files from the result
            :param result: the result
            :return: the result, files
        """
        files = []
        result = {}
        for key, value in outputs.items():
            if isinstance(value, list):
                has_file = False
                for item in value:
                    if isinstance(item, dict) and item.get('__variant') == 'FileVar':
                        try:
                            files.append(FileVar(**item))
                            has_file = True
                        except Exception as e:
                            pass
                if has_file:
                    continue

            result[key] = value

        return result, files

This code demonstrates how the WorkflowTool class handles the invocation of a workflow as a tool, including transforming parameters, invoking the workflow, and processing the outputs [1].

To continue talking to Dosu, mention @dosu.

akramer-zibra commented 1 month ago

Beautiful @dosu ! How do I need to change _invoke method of WorkflowTool class, so the returned result variable will also contain the outputs data in json format?

dosubot[bot] commented 1 month ago

To ensure that the returned result variable in the _invoke method of the WorkflowTool class also contains the outputs data in JSON format, you should modify the method to include the outputs data in the final result list. Here is the updated _invoke method:

def _invoke(
    self, user_id: str, tool_parameters: dict[str, Any]
) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]:
    """
        invoke the tool
    """
    app = self._get_app(app_id=self.workflow_app_id)
    workflow = self._get_workflow(app_id=self.workflow_app_id, version=self.version)

    # transform the tool parameters
    tool_parameters, files = self._transform_args(tool_parameters)

    from core.app.apps.workflow.app_generator import WorkflowAppGenerator
    generator = WorkflowAppGenerator()
    result = generator.generate(
        app_model=app, 
        workflow=workflow, 
        user=self._get_user(user_id), 
        args={
            'inputs': tool_parameters,
            'files': files
        }, 
        invoke_from=self.runtime.invoke_from,
        stream=False,
        call_depth=self.workflow_call_depth + 1,
    )

    data = result.get('data', {})

    if data.get('error'):
        raise Exception(data.get('error'))

    result = []

    outputs = data.get('outputs', {})
    outputs, files = self._extract_files(outputs)
    for file in files:
        result.append(self.create_file_var_message(file))

    # Add the outputs data in JSON format to the result
    result.append(self.create_text_message(json.dumps(outputs, ensure_ascii=False)))

    return result

In this updated method, the outputs data is converted to JSON format using json.dumps(outputs, ensure_ascii=False) and appended to the result list. This ensures that the result variable contains the outputs data in JSON format [1][2][3].

To continue talking to Dosu, mention @dosu.

akramer-zibra commented 1 month ago

Hmm @dosu . Your suggestion looks very like the original code. Please tell me if there is a create_json_message method on Tool class and if so, how to use it.

dosubot[bot] commented 1 month ago

Yes, the Tool class does have a create_json_message method.

To use this method to include the outputs data in JSON format in the result variable, you can do the following:

def _invoke(
    self, user_id: str, tool_parameters: dict[str, Any]
) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]:
    """
        invoke the tool
    """
    app = self._get_app(app_id=self.workflow_app_id)
    workflow = self._get_workflow(app_id=self.workflow_app_id, version=self.version)

    # transform the tool parameters
    tool_parameters, files = self._transform_args(tool_parameters)

    from core.app.apps.workflow.app_generator import WorkflowAppGenerator
    generator = WorkflowAppGenerator()
    result = generator.generate(
        app_model=app, 
        workflow=workflow, 
        user=self._get_user(user_id), 
        args={
            'inputs': tool_parameters,
            'files': files
        }, 
        invoke_from=self.runtime.invoke_from,
        stream=False,
        call_depth=self.workflow_call_depth + 1,
    )

    data = result.get('data', {})

    if data.get('error'):
        raise Exception(data.get('error'))

    result = []

    outputs = data.get('outputs', {})
    outputs, files = self._extract_files(outputs)
    for file in files:
        result.append(self.create_file_var_message(file))

    # Add the outputs data in JSON format to the result using create_json_message
    result.append(self.create_json_message(outputs))

    return result

This will create a ToolInvokeMessage object with the outputs data in JSON format and store it in the result variable [1].

To continue talking to Dosu, mention @dosu.

akramer-zibra commented 1 month ago

Perfect. I think this is a 95% solution for this issue. Never expected this quality of support from a chatbot. Thank you very much for your untiring help dosubot !