Open HITjunle opened 1 month ago
Hello @HITjunle! I'm here to help you with any questions or issues you may have. Let's work together to solve this problem.
To modify the response of PandasAI to output a list like [{average_age}, {min_age}, {max_age}]
instead of a string with extra text, you can extend the ResponseParser
class and override the format_dataframe
method to return the desired list format. Here is an example of how you can achieve this:
import os
import pandas as pd
from pandasai import SmartDatalake
from pandasai.responses.response_parser import ResponseParser
class CustomResponseParser(ResponseParser):
def __init__(self, context) -> None:
super().__init__(context)
def format_dataframe(self, result):
# Assuming result is a dataframe with 'Age' column
average_age = result['Age'].mean()
min_age = result['Age'].min()
max_age = result['Age'].max()
return [average_age, min_age, max_age]
# Example DataFrame
singers_df = pd.DataFrame(
{
"Name": ["Singer1", "Singer2", "Singer3", "Singer4"],
"Age": [25, 30, 43, 40],
"Country": ["France", "France", "France", "France"]
}
)
# Set your API key
os.environ["PANDASAI_API_KEY"] = "YOUR_API_KEY"
# Initialize the SmartDatalake with the custom response parser
agent = SmartDatalake(
[singers_df],
config={"verbose": True, "response_parser": CustomResponseParser},
)
question = "What is the average, minimum, and maximum age for all French singers?"
response = agent.chat(question)
print(response) # Output will be in the format [average_age, min_age, max_age]
This code defines a custom response parser that calculates the average, minimum, and maximum age from the DataFrame and returns them as a list [1].
To continue talking to Dosu, mention @dosu.
@dosu Can you explain the difference between the output of agent.generate_code and agent.execute_code?
The difference between the output of agent.generate_code
and agent.execute_code
in PandasAI is as follows:
agent.generate_code
: This method generates code based on the provided query but does not execute it. It returns the generated code as a string.
def generate_code(self, query: str, output_type: Optional[str] = None):
...
return self.pipeline.run_generate_code(pipeline_input)
agent.execute_code
: This method executes the code that was generated (either provided directly or the last generated code) and returns the result of the execution.
def execute_code(self, code: Optional[str] = None, output_type: Optional[str] = None):
...
return self.pipeline.run_execute_code(pipeline_input)
The generate_code
method uses the run_generate_code
function from the GenerateChatPipeline
class, which involves several steps like validating the input, checking the cache, generating the prompt, generating the code, and cleaning the code. The execute_code
method uses the run_execute_code
function from the CodeExecution
class, which involves executing the code, validating the result, and parsing the result. The execute_code
method also handles retries and error correction if the initial execution fails [1][2][3].
To continue talking to Dosu, mention @dosu.
could you please explain self.pipeline.run_execute_code(pipeline_input) in detail?
The self.pipeline.run_execute_code(pipeline_input)
method in PandasAI involves several steps to execute the code, validate the result, and handle retries and error correction if the initial execution fails. Here is a detailed explanation of how this method works:
Initialization and Setup:
Code Execution Loop:
self.context.config.max_retries
.execute_code
method.Code Execution:
execute_code
method runs the provided Python code in a controlled environment.exec
function, and the result is retrieved from the environment.Result Validation:
OutputValidator
.Error Handling and Retries:
_retry_run_code
method and retries the execution._retry_run_code
method can modify the code based on the error and retry the execution.Final Output:
LogicUnitOutput
object.Here is the relevant code snippet that demonstrates these steps:
class CodeExecution(BaseLogicUnit):
def execute(self, input: Any, **kwargs) -> Any:
self.context: PipelineContext = kwargs.get("context")
self._dfs = self.context.dfs
self._config = self.context.config
self._additional_dependencies = self.context.get("additional_dependencies", [])
self._current_code_executed = self.context.get("current_code_executed")
self.logger: Logger = kwargs.get("logger")
code_context = CodeExecutionContext(
self.context.get("last_prompt_id"), self.context.skills_manager
)
retry_count = 0
code_to_run = input
result = None
while retry_count <= self.context.config.max_retries:
try:
result = self.execute_code(code_to_run, code_context)
if self.context.get("output_type") != "" and (
output_helper := self.context.get("output_type")
):
(validation_ok, validation_errors) = OutputValidator.validate(
output_helper, result
)
if not validation_ok:
raise InvalidLLMOutputType(validation_errors)
if not OutputValidator.validate_result(result):
raise InvalidOutputValueMismatch(
f'Value type {type(result["value"])} must match with type {result["type"]}'
)
break
except Exception as e:
traceback_errors = traceback.format_exc()
self.logger.log(f"Failed with error: {traceback_errors}", logging.ERROR)
if self.on_failure:
self.on_failure(code_to_run, traceback_errors)
if (
not self.context.config.use_error_correction_framework
or retry_count >= self.context.config.max_retries
):
raise e
retry_count += 1
self.logger.log(
f"Failed to execute code retrying with a correction framework "
f"[retry number: {retry_count}]",
level=logging.WARNING,
)
code_to_run = self._retry_run_code(
code_to_run, self.context, self.logger, e
)
return LogicUnitOutput(
result,
True,
"Code Executed Successfully",
{"content_type": "response", "value": ResponseSerializer.serialize(result)},
final_track_output=True,
)
def execute_code(self, code: str, context: CodeExecutionContext) -> Any:
dfs = self._required_dfs(code)
environment: dict = get_environment(self._additional_dependencies)
environment["dfs"] = self._get_originals(dfs)
if len(environment["dfs"]) == 1:
environment["df"] = environment["dfs"][0]
if self._config.direct_sql:
environment["execute_sql_query"] = self._dfs[0].execute_direct_sql_query
if context.skills_manager.used_skills:
for skill_func_name in context.skills_manager.used_skills:
skill = context.skills_manager.get_skill_by_func_name(skill_func_name)
environment[skill_func_name] = skill
exec(code, environment)
if "result" not in environment:
raise NoResultFoundError("No result returned")
return environment["result"]
Additionally, the GenerateChatPipeline
class in pandasai/pipelines/chat/generate_chat_pipeline.py
provides further context on how the pipeline is structured and how error correction is handled:
class GenerateChatPipeline:
code_generation_pipeline = Pipeline
code_execution_pipeline = Pipeline
context: PipelineContext
_logger: Logger
last_error: str
def __init__(
self,
context: Optional[PipelineContext] = None,
logger: Optional[Logger] = None,
judge: BaseJudge = None,
on_prompt_generation=None,
on_code_generation=None,
before_code_execution=None,
on_result=None,
):
self.query_exec_tracker = QueryExecTracker(
server_config=context.config.log_server
)
self.code_generation_pipeline = Pipeline(
context=context,
logger=logger,
query_exec_tracker=self.query_exec_tracker,
steps=[
ValidatePipelineInput(),
CacheLookup(),
PromptGeneration(
skip_if=self.is_cached,
on_execution=on_prompt_generation,
),
CodeGenerator(
skip_if=self.is_cached,
on_execution=on_code_generation,
),
CachePopulation(skip_if=self.is_cached),
CodeCleaning(
skip_if=self.no_code,
on_failure=self.on_code_cleaning_failure,
on_retry=self.on_code_retry,
),
],
)
self.code_execution_pipeline = Pipeline(
context=context,
logger=logger,
query_exec_tracker=self.query_exec_tracker,
steps=[
CodeExecution(
before_execution=before_code_execution,
on_failure=self.on_code_execution_failure,
on_retry=self.on_code_retry,
),
ResultValidation(),
ResultParsing(
before_execution=on_result,
),
],
)
self.code_exec_error_pipeline = ErrorCorrectionPipeline(
context=context,
logger=logger,
query_exec_tracker=self.query_exec_tracker,
on_code_generation=on_code_generation,
on_prompt_generation=on_prompt_generation,
)
self.judge = judge
if self.judge:
if self.judge.pipeline.pipeline.context:
self.judge.pipeline.pipeline.context.memory = context.memory
else:
self.judge.pipeline.pipeline.context = context
self.judge.pipeline.pipeline.logger = logger
self.judge.pipeline.pipeline.query_exec_tracker = self.query_exec_tracker
self.context = context
self._logger = logger
self.last_error = None
def on_code_execution_failure(self, code: str, errors: Exception) -> str:
"""
Executes on code execution failure
Args:
code (str): code that is ran
exception (Exception): exception that is raised during code execution
Returns:
str: returns the updated code with the fixes
"""
# Add information about the code failure in the query tracker for debug
self.query_exec_tracker.add_step(
{
"type": "CodeExecution",
"success": False,
"message": "Failed to execute code",
"execution_time": None,
"data": {
"content_type": "code",
"value": code,
"exception": errors,
},
}
)
correction_input = ErrorCorrectionPipelineInput(code, errors)
return self.code_exec_error_pipeline.run(correction_input)
def on_code_cleaning_failure(self, code, errors):
# Add information about the code failure in the query tracker for debug
self.query_exec_tracker.add_step(
{
"type": "CodeCleaning",
"success": False,
"message": "Failed to clean code",
"execution_time": None,
"data": {
"content_type": "code",
"value": code,
"exception": errors,
},
}
)
def on_code_retry(self, code: str, exception: Exception):
correction_input = ErrorCorrectionPipelineInput(code, exception)
return self.code_exec_error_pipeline.run(correction_input)
def no_code(self, context: PipelineContext):
return context.get("last_code_generated") is None
def is_cached(self, context: PipelineContext):
return context.get("found_in_cache")
def get_last_track_log_id(self):
return self.query_exec_tracker.last_log_id
def run_generate_code(self, input: ChatPipelineInput) -> dict:
"""
Executes the code generation pipeline with user input and return the result
Args:
input (ChatPipelineInput): _description_
Returns:
The `output` dictionary is expected to have the following keys:
- 'type': The type of the output.
- 'value': The value of the output.
"""
self._logger.log(f"Executing Pipeline: {self.__class__.__name__}")
# Reset intermediate values
self.context.reset_intermediate_values()
# Start New Tracking for Query
self.query_exec_tracker.start_new_track(input)
self.query_exec_tracker.add_skills(self.context)
self.query_exec_tracker.add_dataframes(self.context.dfs)
# Add Query to memory
self.context.memory.add(input.query, True)
self.context.add_many(
{
"output_type": input.output_type,
"last_prompt_id": input.prompt_id,
}
)
try:
output = self.code_generation_pipeline.run(input)
self.query_exec_tracker.success = True
self.query_exec_tracker.publish()
return output
except Exception as e:
# Show the full traceback
import traceback
traceback.print_exc()
self.last_error = str(e)
self.query_exec_tracker.success = False
self.query_exec_tracker.publish()
return (
"Unfortunately, I was not able to answer your question, "
"because of the following error:\n"
f"\n{e}\n"
)
def run_execute_code(self, input: CodeExecutionPipelineInput) -> dict:
"""
Executes the chat pipeline with user input and return the result
Args:
input (CodeExecutionPipelineInput): _description_
Returns:
The `output` dictionary is expected to have the following keys:
- 'type': The type of the output.
- 'value': The value of the output.
"""
self._logger.log(f"Executing Pipeline: {self.__class__.__name__}")
# Reset intermediate values
self.context.reset_intermediate_values()
# Start New Tracking for Query
self.query_exec_tracker.start_new_track(input)
self.query_exec_tracker.add_skills(self.context)
self.query_exec_tracker.add_dataframes(self.context.dfs)
# Add Query to memory
self.context.memory.add(input.code, True)
self.context.add_many(
{
"output_type": input.output_type,
"last_prompt_id": input.prompt_id,
}
)
try:
output = self.code_execution_pipeline.run(input.code)
self.query_exec_tracker.success = True
self.query_exec_tracker.publish()
return output
except Exception as e:
# Show the full traceback
import traceback
traceback.print_exc()
self.last_error = str(e)
self.query_exec_tracker.success = False
self.query_exec_tracker.publish()
return (
"Unfortunately, I was not able to answer your question, "
"because of the following error:\n"
f"\n{e}\n"
)
This code provides a comprehensive view of how the self.pipeline.run_execute_code(pipeline_input)
method works, including code execution, validation, and error handling with retries [1][2][3].
To continue talking to Dosu, mention @dosu.
I have a question: What is the average, minimum, and maximum age for all French singers? and I use the following code
and the output is
I don't want this kind of result output, I need the output to be a list, like[{average_age}, {min_age}, {max_age}]. I don't need to include the extra string. How can I modify the response of pandasai?