tensorlakeai / indexify

A realtime serving engine for Data-Intensive Generative AI Applications
https://docs.tensorlake.ai
Apache License 2.0
919 stars 118 forks source link

Improve Error Handling in Indexify Python SDK #891

Open PulkitMishra opened 1 month ago

PulkitMishra commented 1 month ago

Improve Error Handling in Indexify Python SDK

Issue Description

The current implementation of the Indexify Python SDK lacks robust error handling and reporting mechanisms.

Specific Examples

  1. In indexify/remote_client.py, the _request method:
def _request(self, method: str, **kwargs) -> httpx.Response:
    try:
        response = self._client.request(method, timeout=self._timeout, **kwargs)
        status_code = str(response.status_code)
        if status_code.startswith("4"):
            raise ApiException(
                "status code: " + status_code + " request args: " + str(kwargs)
            )
        if status_code.startswith("5"):
            raise ApiException(response.text)
    except httpx.ConnectError:
        message = (
            f"Make sure the server is running and accesible at {self._service_url}"
        )
        ex = ApiException(status="ConnectionError", message=message)
        print(ex)
        raise ex
    return response

Issues:

  1. In indexify/executor/function_worker.py, the async_submit method:
async def async_submit(
    self,
    namespace: str,
    graph_name: str,
    fn_name: str,
    input: IndexifyData,
    code_path: str,
    version: int,
    init_value: Optional[IndexifyData] = None,
) -> FunctionWorkerOutput:
    try:
        result = await asyncio.get_running_loop().run_in_executor(
            self._executor,
            _run_function,
            namespace,
            graph_name,
            fn_name,
            input,
            code_path,
            version,
            init_value,
        )
    except BrokenProcessPool as mp:
        self._executor.shutdown(wait=True, cancel_futures=True)
        traceback.print_exc()
        raise mp
    except FunctionRunException as e:
        print(e)
        print(traceback.format_exc())
        return FunctionWorkerOutput(
            exception=str(e),
            stdout=e.stdout,
            stderr=e.stderr,
            reducer=e.is_reducer,
            success=False,
        )

Issues:

  1. In indexify/executor/agent.py, the task_completion_reporter method:
async def task_completion_reporter(self):
    console.print(Text("Starting task completion reporter", style="bold cyan"))
    url = f"{self._protocol}://{self._server_addr}/write_content"
    while True:
        outcomes = await self._task_store.task_outcomes()
        for task_outcome in outcomes:
            # ... (omitted for brevity)
            try:
                self._task_reporter.report_task_outcome(completed_task=task_outcome)
            except Exception as e:
                console.print(
                    Panel(
                        f"Failed to report task {task_outcome.task.id}\n"
                        f"Exception: {e}\nRetrying...",
                        title="Reporting Error",
                        border_style="error",
                    )
                )
                await asyncio.sleep(5)
                continue

Issues:

Proposed Solution

  1. Create a custom exception hierarchy:

    • Implement a base IndexifyException class.
    • Create specific exception subclasses for different types of errors (e.g., NetworkError, ExecutionError, ConfigurationError).
  2. Implement a centralized error handling and logging mechanism:

    • Create an ErrorHandler class that can be configured with custom logging and reporting options.
    • Use this ErrorHandler consistently throughout the SDK.
  3. Enhance error context:

    • Modify exception classes to include more context (e.g., function name, input data summary, graph details).
    • Implement a method to safely serialize error context, avoiding potential issues with unpicklable objects.
  4. Improve retry mechanisms:

    • Implement an exponential backoff strategy for retries.
    • Allow configuration of retry attempts and conditions.
  5. Add error callback support:

    • Allow users to register custom error callbacks for specific types of errors.

Implementation Plan

  1. Define the exception hierarchy in a new file indexify/exceptions.py.
  2. Implement the ErrorHandler class in indexify/error_handling.py.
  3. Modify existing code to use the new exception classes and ErrorHandler:
    • Update remote_client.py to use specific exceptions and the ErrorHandler.
    • Refactor function_worker.py to provide more context in errors and use the ErrorHandler.
    • Enhance agent.py with improved error handling and retry logic.
  4. Add configuration options for error handling in the client initialization.
  5. Update documentation to reflect the new error handling capabilities.
  6. Add unit tests for the new error handling mechanisms.
stangirala commented 1 month ago

There's a few different types of errors. I guess this will surface when you make the changes.

For the worker and related code (anything that ends up touching the user, ie. graph, code), does it make sense to capture only the task Id rather than a lot of data such as inputs? With task id we might be able to use the server blog store to retrieve inputs/outputs.

PulkitMishra commented 1 month ago

@stangirala ah ok i get the context that you were talking about now wrt #900 . few thoughts

PulkitMishra commented 1 month ago

related #909