microsoft / TaskWeaver

A code-first agent framework for seamlessly planning and executing data analytics tasks.
https://microsoft.github.io/TaskWeaver/
MIT License
5.38k stars 689 forks source link

Facing issue while setting and getting session variables. #311

Closed shivakharbanda closed 7 months ago

shivakharbanda commented 7 months ago

let me first give code here

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from .managers.session_manager import UserSession
from taskweaver.app.app import TaskWeaverApp

import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# This would be a global variable, potentially in the same module as your consumer
user_sessions = {}

app_dir = "metadata/project"
app = TaskWeaverApp(app_dir=app_dir)  # Initialize your AI app

class ChatAIConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # Extract session_id and datasource_id from the URL path
        self.session_id = self.scope['url_route']['kwargs']['session_id']
        self.datasource_id = self.scope['url_route']['kwargs']['datasource_id']

        logger.info(f"Attempting to connect: session_id={self.session_id}, datasource_id={self.datasource_id}")
        # Accept the WebSocket connection
        await self.accept()

        # Create a new AI session and store it in the user_sessions dictionary
        ai_client = app.get_session()  # Create a session for the AI client
        user_sessions[self.session_id] = UserSession(
            session_id=self.session_id, 
            auth_token=None,  # Token will be set after authentication
            datasource_id=self.datasource_id, 
            ai_client=ai_client
        )

        user_sessions[self.session_id].ai_client.update_session_var(variables = {"datasource_id": self.datasource_id})

        logger.info(f"WebSocket connection accepted and AI session created for session_id={self.session_id}")

    async def disconnect(self, close_code):
        # Handle cleanup on disconnect
        session = user_sessions.pop(self.session_id, None)
        if session:
            session.ai_client.stop()  # Ensure session cleanup
            logger.info(f"Session {self.session_id} disconnected and cleaned up")

    async def receive(self, text_data):

        # Process incoming messages from the user
        text_data_json = json.loads(text_data)

        logger.info(f"Received message: {text_data_json}")

        # Check for the expected message type
        if text_data_json.get('type') == 'authenticate':
            # Handle authentication
            auth_token = text_data_json.get('token')
            session_id = self.scope['url_route']['kwargs']['session_id']
            datasource_id = self.scope['url_route']['kwargs']['datasource_id']
            if not await self.authenticate_token(auth_token):
                # Close the connection if the token is invalid
                logger.error(f"Authentication failed for token={auth_token}")
                await self.close(code=4001)
                return

            # Update the auth token in the user session
            user_sessions[session_id].auth_token = auth_token
            breakpoint()

            user_sessions[session_id].ai_client.update_session_var(variables = {"auth_token": auth_token})

            logger.info(f"User authenticated successfully for session_id={session_id} and ds id {datasource_id}")

            await self.send(text_data=json.dumps({"message": "Authenticated successfully"}))
        else:
            # Handle other message types, such as AI chat messages
            message = text_data_json.get("message")
            session = user_sessions.get(self.session_id)
            if session and session.ai_client:
                # Use the session's AI client to handle the message and get a response
                ai_response = await self.handle_ai_response(message, session.ai_client)
                logger.info(f"Message processed and response sent for session_id={self.session_id}")
                await self.send(text_data=json.dumps({"message": ai_response}))
            else:
                # Session not found or message received before authentication
                await self.send(text_data=json.dumps({"error": "Unauthorized"}))
                logger.warning(f"Unauthorized access attempt or session not found for session_id={self.session_id}")

    async def handle_ai_response(self, message, ai_client):
        # Using the ai_client to send the message to the AI and receive a response
        response_round = ai_client.send_message(message)
        # Convert the AI's response to a dictionary format and send it back
        response_dict = response_round.to_dict()
        logger.info(f"AI response for message: {message}, response: {response_dict}")
        return response_dict

    async def authenticate_token(self, token):
        logger.info(f"Authenticating token: {token}")
        # Implement actual token authentication logic here
        # For now, assuming all tokens are valid
        return True

this is my consumers file and i am setting the session variables. doing it according to the documentation but inside the plugin

import requests
from taskweaver.plugin import Plugin, register_plugin

from dotenv import load_dotenv
import os 
load_dotenv()

# Retrieve environment variables
API_HOST = os.getenv("API_HOST", "192.168.1.47")
API_PORT = os.getenv("API_PORT", "8000")

@register_plugin
class MetadataFetchPlugin(Plugin):
    def __call__(self):
        """
        Fetches metadata of a particular datasource using its datasource ID.

        :param datasource_id: The ID of the datasource to fetch metadata for.
        :return: A tuple containing the metadata dictionary and a description string.
        """
        datasource_id = self.ctx.get_session_var("datasource_id")

        print(self.ctx)

        if datasource_id is None:
            raise ValueError("No datasource ID found in session. stop the execution and ask user to update session properly")

        # Build the full URL using environment variables
        url = f"http://{API_HOST}:{API_PORT}/api/metadata/fetch/{datasource_id}/"

        try:
            response = requests.get(url)
            response.raise_for_status()
            data = response.json()

            if data["status"] == "success":
                metadata = data["data"]
                description = f"Metadata fetched successfully for datasource ID {datasource_id}."
            else:
                metadata = None
                description = f"Failed to fetch metadata for datasource ID {datasource_id}. Message: {data['message']}"

        except requests.exceptions.RequestException as e:
            metadata = None
            description = f"An error occurred while fetching metadata for datasource ID {datasource_id}. Error: {str(e)}"

        return metadata, description

i am not able to get the setted variables. and i am not able to understand what am i doing wrong.

shivakharbanda commented 7 months ago

HOW DO I DEBUG THE WHOLE THING I AM NOT ABLE TO UNDERSTAND. PLEASE HELP

shivakharbanda commented 7 months ago

i am pretty sure i am passing the session variables correctly as you can see below


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:taskweaver.logging:LLM output: {"response": [{"type": "init_plan", "content": "1. Fetch metadata of the datasource"}, {"type": "plan", "content": "1. Fetch metadata of the datasource"}, {"type": "current_plan_step", "content": "1. Fetch metadata of the datasource"}, {"type": "send_to", "content": "User"}, {"type": "message", "content": "The execution of fetching metadata failed due to a missing datasource ID. Please provide the correct datasource ID to proceed."}]}
INFO:taskweaver.logging:Planner talk to User: The execution of fetching metadata failed due to a missing datasource ID. Please provide the correct datasource ID to proceed.
INFO:metadata.consumers:AI response for message: Fetch the metadata of the datasource, response: {'id': 'round-20240421-113341-64dc3d28', 'user_query': 'Fetch the metadata of the datasource', 'state': 'finished', 'post_list': [{'id': 'post-20240421-113341-0c3a0f31', 'message': 'Fetch the metadata of the datasource', 'send_from': 'User', 'send_to': 'Planner', 'attachment_list': []}, {'id': 'post-20240421-113341-146f69b7', 'message': 'Please fetch the metadata of the datasource', 'send_from': 'Planner', 'send_to': 'CodeInterpreter', 'attachment_list': [{'id': 'atta-20240421-113343-cb5024fb', 'type': 'init_plan', 'content': '1. Fetch metadata of the datasource', 'extra': None}, {'id': 'atta-20240421-113343-ebed90f6', 'type': 'plan', 'content': '1. Fetch metadata of the datasource', 'extra': None}, {'id': 'atta-20240421-113344-f1b3a65c', 'type': 'current_plan_step', 'content': '1. Fetch metadata of the datasource', 'extra': None}, {'id': 'atta-20240421-113344-a334c861', 'type': 'board', 'content': 'I have drawn up a plan: \n1. Fetch metadata of the datasource\n\nPlease proceed with this step of this plan: Please fetch the metadata of the datasource', 'extra': None}]}, {'id': 'post-20240421-113344-fc6934ab', 'message': 'The following python code has been executed:\n```python\n# Fetch the metadata of the datasource\nmetadata, description = metadata_fetch()\nmetadata\n```\n\n\nThe execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[1], line 2\n    metadata, description = metadata_fetch()\n\n  File /tmp/tmpr5_yjs9f/metadata_fetch.py:27 in __call__\n\nValueError: No datasource ID found in session. stop the execution and ask user to update session properly\n\n<taskweaver.ces.runtime.context.ExecutorPluginContext object at 0x7f9f0d2b4a90>\n\n', 'send_from': 'CodeInterpreter', 'send_to': 'CodeInterpreter', 'attachment_list': [{'id': 'atta-20240421-113347-987930bb', 'type': 'python', 'content': '# Fetch the metadata of the datasource\nmetadata, description = metadata_fetch()\nmetadata', 'extra': None}, {'id': 'atta-20240421-113347-5c2a5777', 'type': 'verification', 'content': 'NONE', 'extra': None}, {'id': 'atta-20240421-113347-aeff8e03', 'type': 'code_error', 'content': 'No code verification is performed.', 'extra': None}, {'id': 'atta-20240421-113356-cef7f8f8', 'type': 'execution_status', 'content': 'FAILURE', 'extra': None}, {'id': 'atta-20240421-113356-de672ecb', 'type': 'execution_result', 'content': 'The execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[1], line 2\n    metadata, description = metadata_fetch()\n\n  File /tmp/tmpr5_yjs9f/metadata_fetch.py:27 in __call__\n\nValueError: No datasource ID found in session. stop the execution and ask user to update session properly\n\n<taskweaver.ces.runtime.context.ExecutorPluginContext object at 0x7f9f0d2b4a90>\n\n', 'extra': None}, {'id': 'atta-20240421-113356-d9732e08', 'type': 'artifact_paths', 'content': [], 'extra': None}, {'id': 'atta-20240421-113356-e2265e13', 'type': 'revise_message', 'content': 'The execution of the previous generated code has failed. If you think you can fix the problem by rewriting the code, please generate code and run it again.\nOtherwise, please explain the problem to me.', 'extra': None}]}, {'id': 'post-20240421-113356-3e022c28', 'message': "The following python code has been executed:\n```python\n# Set the datasource ID properly\n\n# Assuming the datasource ID is 123\nmetadata, description = metadata_fetch(datasource_id=123)\nmetadata\n```\n\n\nThe execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[2], line 4\n    metadata, description = metadata_fetch(datasource_id=123)\n\nTypeError: MetadataFetchPlugin.__call__() got an unexpected keyword argument 'datasource_id'\n\n", 'send_from': 'CodeInterpreter', 'send_to': 'CodeInterpreter', 'attachment_list': [{'id': 'atta-20240421-113359-a1fc8ed2', 'type': 'thought', 'content': 'ProgramApe needs to ensure that the datasource ID is properly set before fetching the metadata.', 'extra': None}, {'id': 'atta-20240421-113359-abd9804c', 'type': 'python', 'content': '# Set the datasource ID properly\n\n# Assuming the datasource ID is 123\nmetadata, description = metadata_fetch(datasource_id=123)\nmetadata', 'extra': None}, {'id': 'atta-20240421-113400-5d675af5', 'type': 'verification', 'content': 'NONE', 'extra': None}, {'id': 'atta-20240421-113400-564b4c6a', 'type': 'code_error', 'content': 'No code verification is performed.', 'extra': None}, {'id': 'atta-20240421-113401-3954b9c6', 'type': 'execution_status', 'content': 'FAILURE', 'extra': None}, {'id': 'atta-20240421-113401-2e20cdec', 'type': 'execution_result', 'content': "The execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[2], line 4\n    metadata, description = metadata_fetch(datasource_id=123)\n\nTypeError: MetadataFetchPlugin.__call__() got an unexpected keyword argument 'datasource_id'\n\n", 'extra': None}, {'id': 'atta-20240421-113401-b15460ff', 'type': 'artifact_paths', 'content': [], 'extra': None}, {'id': 'atta-20240421-113401-fdb486b3', 'type': 'revise_message', 'content': 'The execution of the previous generated code has failed. If you think you can fix the problem by rewriting the code, please generate code and run it again.\nOtherwise, please explain the problem to me.', 'extra': None}]}, {'id': 'post-20240421-113401-0079243a', 'message': 'The following python code has been executed:\n```python\n# Set the datasource ID properly\n\n# Assuming the datasource ID is 123\nmetadata = {}\ndescription = ""\nmetadata, description = metadata_fetch()\nmetadata\n```\n\n\nThe execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[3], line 6\n    metadata, description = metadata_fetch()\n\n  File /tmp/tmpr5_yjs9f/metadata_fetch.py:27 in __call__\n\nValueError: No datasource ID found in session. stop the execution and ask user to update session properly\n\n<taskweaver.ces.runtime.context.ExecutorPluginContext object at 0x7f9f0d2b4a90>\n\n', 'send_from': 'CodeInterpreter', 'send_to': 'CodeInterpreter', 'attachment_list': [{'id': 'atta-20240421-113404-7473d031', 'type': 'python', 'content': '# Set the datasource ID properly\n\n# Assuming the datasource ID is 123\nmetadata = {}\ndescription = ""\nmetadata, description = metadata_fetch()\nmetadata', 'extra': None}, {'id': 'atta-20240421-113406-446396e7', 'type': 'verification', 'content': 'NONE', 'extra': None}, {'id': 'atta-20240421-113406-c433b440', 'type': 'code_error', 'content': 'No code verification is performed.', 'extra': None}, {'id': 'atta-20240421-113407-fcc63d58', 'type': 'execution_status', 'content': 'FAILURE', 'extra': None}, {'id': 'atta-20240421-113407-0c801b17', 'type': 'execution_result', 'content': 'The execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[3], line 6\n    metadata, description = metadata_fetch()\n\n  File /tmp/tmpr5_yjs9f/metadata_fetch.py:27 in __call__\n\nValueError: No datasource ID found in session. stop the execution and ask user to update session properly\n\n<taskweaver.ces.runtime.context.ExecutorPluginContext object at 0x7f9f0d2b4a90>\n\n', 'extra': None}, {'id': 'atta-20240421-113407-950ed620', 'type': 'artifact_paths', 'content': [], 'extra': None}, {'id': 'atta-20240421-113407-7dffe03c', 'type': 'revise_message', 'content': 'The execution of the previous generated code has failed. If you think you can fix the problem by rewriting the code, please generate code and run it again.\nOtherwise, please explain the problem to me.', 'extra': None}]}, {'id': 'post-20240421-113407-3a95a544', 'message': 'The following python code has been executed:\n```python\n# Set the datasource ID properly\n\n# Assuming the datasource ID is 123\nmetadata = {}\ndescription = ""\nmetadata, description = metadata_fetch()\nmetadata\n```\n\n\nThe execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[4], line 6\n    metadata, description = metadata_fetch()\n\n  File /tmp/tmpr5_yjs9f/metadata_fetch.py:27 in __call__\n\nValueError: No datasource ID found in session. stop the execution and ask user to update session properly\n\n<taskweaver.ces.runtime.context.ExecutorPluginContext object at 0x7f9f0d2b4a90>\n\n', 'send_from': 'CodeInterpreter', 'send_to': 'Planner', 'attachment_list': [{'id': 'atta-20240421-113410-17d661b8', 'type': 'python', 'content': '# Set the datasource ID properly\n\n# Assuming the datasource ID is 123\nmetadata = {}\ndescription = ""\nmetadata, description = metadata_fetch()\nmetadata', 'extra': None}, {'id': 'atta-20240421-113411-b8c79a8d', 'type': 'verification', 'content': 'NONE', 'extra': None}, {'id': 'atta-20240421-113411-7f6c9f2a', 'type': 'code_error', 'content': 'No code verification is performed.', 'extra': None}, {'id': 'atta-20240421-113412-0cda3d00', 'type': 'execution_status', 'content': 'FAILURE', 'extra': None}, {'id': 'atta-20240421-113412-879dc071', 'type': 'execution_result', 'content': 'The execution of the generated python code above has failed\n\nDuring execution, the following messages were logged:\nTraceback (most recent call last):\n\n  Cell In[4], line 6\n    metadata, description = metadata_fetch()\n\n  File /tmp/tmpr5_yjs9f/metadata_fetch.py:27 in __call__\n\nValueError: No datasource ID found in session. stop the execution and ask user to update session properly\n\n<taskweaver.ces.runtime.context.ExecutorPluginContext object at 0x7f9f0d2b4a90>\n\n', 'extra': None}, {'id': 'atta-20240421-113412-9fd22251', 'type': 'artifact_paths', 'content': [], 'extra': None}]}, {'id': 'post-20240421-113412-dfa2b63b', 'message': 'The execution of fetching metadata failed due to a missing datasource ID. Please provide the correct datasource ID to proceed.', 'send_from': 'Planner', 'send_to': 'User', 'attachment_list': [{'id': 'atta-20240421-113413-4ff3eff8', 'type': 'init_plan', 'content': '1. Fetch metadata of the datasource', 'extra': None}, {'id': 'atta-20240421-113414-316410a1', 'type': 'plan', 'content': '1. Fetch metadata of the datasource', 'extra': None}, {'id': 'atta-20240421-113414-2945921c', 'type': 'current_plan_step', 'content': '1. Fetch metadata of the datasource', 'extra': None}, {'id': 'atta-20240421-113415-befdcd1c', 'type': 'board', 'content': 'I have drawn up a plan: \n1. Fetch metadata of the datasource\n\nPlease proceed with this step of this plan: The execution of fetching metadata failed due to a missing datasource ID. Please provide the correct datasource ID to proceed.', 'extra': None}]}]}
> /app/metadata/consumers.py(91)receive()
-> logger.info(f"Message processed and response sent for session_id={self.session_id}")
(Pdb) session_var
*** NameError: name 'session_var' is not defined
(Pdb) self.session
*** AttributeError: 'ChatAIConsumer' object has no attribute 'session'
(Pdb) self.ai_session
*** AttributeError: 'ChatAIConsumer' object has no attribute 'ai_session'
(Pdb) l
 86                 message = text_data_json.get("message")
 87                 session = user_sessions.get(self.session_id)
 88                 if session and session.ai_client:
 89                     # Use the session's AI client to handle the message and get a response
 90                     ai_response = await self.handle_ai_response(message, session.ai_client)
 91  ->                 logger.info(f"Message processed and response sent for session_id={self.session_id}")
 92                     await self.send(text_data=json.dumps({"message": ai_response}))
 93                 else:
 94                     # Session not found or message received before authentication
 95                     await self.send(text_data=json.dumps({"error": "Unauthorized"}))
 96                     logger.warning(f"Unauthorized access attempt or session not found for session_id={self.session_id}")
(Pdb) session.ai_client
<taskweaver.session.session.Session object at 0x7ebd24673f10>
(Pdb) session.ai_client.session_var
{'datasource_id': '34'}
(Pdb) 

i have debugged it and you can see the session_var is being set but not being able to receive inside the plugin

shivakharbanda commented 7 months ago

@liqul can you please help

liqul commented 7 months ago

@liqul can you please help

There is currently an issue introduced by recent refactories. The session variables are not be able to updated. The fix is on the way and will let you know when it is merged.

shivakharbanda commented 7 months ago

i took pull from your branch which you have requested merge in. i have made a small change and now it works.

the below code is on line number 434 in environment.py

def _get_session(
        self,
        session_id: str,
        session_dir: Optional[str] = None,
    ) -> EnvSession:
        if session_id not in self.session_dict:
            new_session = EnvSession(session_id)
            new_session.session_dir = (
                session_dir if session_dir is not None else self._get_default_session_dir(session_id)
            )
            os.makedirs(new_session.session_dir, exist_ok=True)
            self.session_dict[session_id] = new_session

        return self.session_dict.get(session_id, None)

you had this check session_dir is not None which was making a issue as session_dir was None but after removing it works fine. i am able to get the data from session.

@liqul