temporalio / sdk-python

Temporal Python SDK
MIT License
457 stars 68 forks source link

[Bug] Exception in converter encoding doesn't propagate as expected #540

Closed assafz-q closed 1 month ago

assafz-q commented 4 months ago

What are you really trying to do?

I am writing a temporal workflow and I want to handle the exceptions that happen during the execution, I want the workflow to catch any unexpected error and fail the entire workflow in a non-retriable way.

Describe the bug

I am catching all the exceptions during workflow run and re-raise them as non-retriable ApplicationErrors and everything works as expected except for the case where the exception is thrown during the encoding of data to an activity.

Running a workflow that sends unserializable obj to an activity results in the following history (which doesn't contain any failure in it except for the timeout [2s]): image

and the following warning in the logs:

WARN temporal_sdk_core::worker::workflow: Error while completing workflow activation error=status: InvalidArgument, message: "invalid TaskQueue on ScheduleActivityTaskCommand: missing task queue name. ActivityId=1 ActivityType=some_activity", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Minimal Reproduction

​import asyncio
import datetime
import uuid

import temporalio.activity
import temporalio.client
import temporalio.common
import temporalio.exceptions
import temporalio.worker
import temporalio.workflow

#### activities
class SomeObj:  # non serializable by default so senfding it to activities should fail
    def __init__(self, value: str):
        self.value = value

@temporalio.activity.defn
async def some_activity(obj: SomeObj) -> str:
    return obj.value

#### workflow
@temporalio.workflow.defn
class FailingWorkflowExceptionDuringActivitySend:
    @temporalio.workflow.run
    async def run(self) -> str:
        try:
            obj = SomeObj("value")
            value = await temporalio.workflow.execute_activity(
                some_activity,
                obj,
                start_to_close_timeout=datetime.timedelta(seconds=10),
            )
            return value
        except Exception as e:
            raise temporalio.exceptions.ApplicationError(
                "Failed during activity send as expected", non_retryable=True
            ) from e

####

async def _main():
    client = await temporalio.client.Client.connect(
        target_host="localhost:7233",
    )

    task_queue = "task-queue-name"
    workflow = FailingWorkflowExceptionDuringActivitySend

    async with temporalio.worker.Worker(
        client,
        task_queue=task_queue,
        workflows=[workflow],
        activities=[some_activity],
    ):
        try:
            handle = await client.start_workflow(
                workflow.run,
                id=f"{workflow.__name__}-{uuid.uuid4().hex}",
                task_queue=task_queue,
                run_timeout=datetime.timedelta(seconds=2),
                execution_timeout=datetime.timedelta(seconds=2),
                retry_policy=temporalio.common.RetryPolicy(
                    maximum_attempts=1,
                ),
            )

            result = await handle.result()
            print(f"Result: {result}")
        except Exception as e:
            print(f"Workflow: {workflow.__name__} failed:{e.__cause__}")

if __name__ == "__main__":
    asyncio.run(_main())

Environment/Versions

cretz commented 4 months ago

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Common exceptions do not fail the workflow by default, they "suspend" it. This is the same way as if an exception is thrown in the middle of workflow execution. See https://github.com/temporalio/sdk-python?tab=readme-ov-file#exceptions. You can customize this behavior and/or customize the converter to throw certain exceptions.

assafz-q commented 4 months ago

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Common exceptions do not fail the workflow by default, they "suspend" it. This is the same way as if an exception is thrown in the middle of workflow execution. See https://github.com/temporalio/sdk-python?tab=readme-ov-file#exceptions. You can customize this behavior and/or customize the converter to throw certain exceptions.

I know, that is why I catch the exception and raise it again as non-retriable ApplicationError.

cretz commented 4 months ago

Right, so the exception raised by the data converter is similar to raising the exception in the workflow, but it occurs on a different path (it occurs in the payload converter not the workflow code). It's a bit advanced, but you can customize the converter the same way you customize the workflow to catch and re-raise in a certain way. Alternatively to both, you can just put failure_exception_types=[Exception] or similar in the @workflow.defn and it should fail the workflow with any exception instead of failing the task (i.e. suspending pending code fix).

assafz-q commented 4 months ago

Right, so the exception raised by the data converter is similar to raising the exception in the workflow, but it occurs on a different path (it occurs in the payload converter not the workflow code). It's a bit advanced, but you can customize the converter the same way you customize the workflow to catch and re-raise in a certain way. Alternatively to both, you can just put failure_exception_types=[Exception] or similar in the @workflow.defn and it should fail the workflow with any exception instead of failing the task (i.e. suspending pending code fix).

Thanks! But I am able to catch the exception in the try except of the workflow so I think everything is as I expected. I think the problem is related to the warning log I attached to the issue:

WARN temporal_sdk_core::worker::workflow: Error while completing workflow activation error=status: InvalidArgument, message: "invalid TaskQueue on ScheduleActivityTaskCommand: missing task queue name. ActivityId=1 ActivityType=some_activity", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }

I was able to (maybe) fix that locally if in (temporalio.worker._workflow_instance._ActivityHandle._apply_schedule_command) https://github.com/temporalio/sdk-python/blob/365ceadb0732f36b66bf1ae2970131fc7944b565/temporalio/worker/_workflow_instance.py#L2145-L2215 I moved the part that handles the payloads: https://github.com/temporalio/sdk-python/blob/365ceadb0732f36b66bf1ae2970131fc7944b565/temporalio/worker/_workflow_instance.py#L2167-L2170 to the end of the function (_apply_schedule_command)

I am not too familiar with the code, but it seems like in that context the queue name is only initialized after the payload conversion, so if I had an exception during the payload conversion it could not be sent to the temporal server.

cretz commented 3 months ago

I was able to (maybe) fix that locally if in (temporalio.worker._workflow_instance._ActivityHandle._apply_schedule_command) to the end of the function (_apply_schedule_command)

I see the issue. The issue is that we create the command before we try to serialize the contents. So if this does not fail the workflow task, the command is sent off incomplete (because it threw an exception during building). I have opened https://github.com/temporalio/sdk-python/issues/564 to track (it is easier as a separate issue to state the problem clearly), but will keep this issue open.