temporalio / sdk-python

Temporal Python SDK
MIT License
473 stars 77 forks source link

[Bug] Workflow cancellation can be swallowed #620

Open cretz opened 3 months ago

cretz commented 3 months ago

Describe the bug

In Python if a cancellation is requested of a workflow with a short-lived activity, and that activity completes during cancellation (unsure if you have to set cancellation type as wait on the activity), the cancellation is effectively swallowed. This occurs for any Python asyncio where cancellation is used, if the awaiting thing swallows cancellation it's like it never happened.

This is probably just existing, unfortunate Python behavior we have to document. We need to at least confirm if it is only for wait-cancel cancellation types or try-cancel too.

lambyqq commented 3 months ago

Here's some test code that could reproduce this:

@activity.defn
async def short_activity_async():
   delay = random.uniform(0.05, 0.15)  # 50~150ms delay
   await asyncio.sleep(delay)

@activity.defn
def short_activity_sync():
   delay = random.uniform(0.05, 0.15)  # 50~150ms delay
   sleep(delay)

@workflow.defn
class ShortActivityWorkflow:
   @workflow.run
   async def run(self, total_seconds: float = 10.0):
       end = workflow.now() + timedelta(seconds=total_seconds)
       while True:
           await workflow.execute_activity(
               short_activity_async,
               schedule_to_close_timeout=timedelta(seconds=10))
           await workflow.execute_activity(
               short_activity_sync,
               schedule_to_close_timeout=timedelta(seconds=10))
           if workflow.now() > end:
               break

@pytest.mark.asyncio
async def test_workflow_cancellation():
   ...
   client: Client = ...
   async with Worker(
           client,
           task_queue='test-wf-cancellation-task-queue',
           workflows=[ShortActivityWorkflow],
           activities=[short_activity_async, short_activity_sync],
           activity_executor=ThreadPoolExecutor(max_workers=1)
   ):
       for i in range(10):
           wf_duration = random.uniform(5.0, 15.0)
           wf_handle = await client.start_workflow(
               ShortActivityWorkflow.run,
               id=f'short_activity_wf_id-{i}',
               args=[wf_duration],
               task_queue=test_worker_task_queue,
               execution_timeout=timedelta(minutes=1)
           )

           # Cancel wf
           await asyncio.sleep(1.0)
           await wf_handle.cancel()

           with pytest.raises(WorkflowFailureError) as err_info:
               await wf_handle.result()  # failed
           cause = err_info.value.cause
           assert isinstance(cause, CancelledError)
           assert cause.message == 'Workflow cancelled'