temporalio / sdk-python

Temporal Python SDK
MIT License
457 stars 68 forks source link

[Bug] Child workflow execution stuck when running in testing.WorkflowEnvironment and workflow update is involved. #661

Open Chengdyc opened 6 hours ago

Chengdyc commented 6 hours ago

What are you really trying to do?

We're using Temporal to build a chat application and we have a parent-child workflow set up that models the conversation (session) and each conversation turn (request+response). The conversation is a long running entity workflow, and each input is submitted via a 'workflow update' which starts a child workflow to generate the response.

we'd want to write a unit test for the conversation workflow to verify it's behavior. In the unit test, we mock the child workflow so we don't need to mock the logic and activities within the child workflow.

Describe the bug

When we run the unit test, the test get stuck starting the child workflow. We added print statements before calling execute_child_workflow and at the start of the child workflow. It appears that Temporal test environment is unable to start the child workflow.

Minimal Reproduction

unit test to reproduce the bug. there are 3 test cases here.

  1. send 'workflow update' to parent workflow that starts a child workflow. this test will get stuck, remove the pytest.mark.skip annotation to run it
  2. send 'signal' to parent workflow that starts a child workflow. this test succeeds
  3. send 'workflow update' to workflow that starts an activity (no child workflow involved). this test succeeds.

I'm using Temporal Python SDK version 1.7.1

import asyncio
import logging
import uuid
from datetime import timedelta

import pytest
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

logger = logging.getLogger(__name__)

@workflow.defn
class ParentWorkflow:
    def __init__(self) -> None:
        self.input = None
        self.output = None

    @workflow.run
    async def run(self):
        while True:
            try:
                await workflow.wait_condition(
                    lambda: self.input is not None,
                    timeout=timedelta(minutes=60)
                )
            except TimeoutError:
                workflow.logger.info("we're done")
                return

            workflow.logger.info("exited wait, starting child workflow")
            self.output = await workflow.execute_child_workflow(
                ChildWorkflow.run,
                args=[self.input],
                task_queue=workflow.info().task_queue,
            )
            workflow.logger.info("child workflow complete, got output")
            self.input = None

    @workflow.update
    async def process_update(self, input: str):
        self.input = input
        self.output = None
        logger.info("waiting for output")
        await workflow.wait_condition(
            lambda: self.output is not None
        )
        return self.output

    @workflow.signal
    async def signal(self, input: str):
        self.input = input
        self.output = None

@workflow.defn
class ChildlessWorkflow:
    def __init__(self) -> None:
        self.input = None
        self.output = None

    @workflow.run
    async def run(self):
        while True:
            try:
                await workflow.wait_condition(
                    lambda: self.input is not None,
                    timeout=timedelta(minutes=60)
                )
            except TimeoutError:
                workflow.logger.info("we're done")
                return

            workflow.logger.info("exited wait, starting activity")
            self.output = await workflow.execute_activity_method(
                Activities.child_activity,
                args=[self.input],
                task_queue=workflow.info().task_queue,
                schedule_to_close_timeout=timedelta(
                    seconds=60
                ),
                retry_policy=RetryPolicy(maximum_attempts=1)
            )
            workflow.logger.info("child workflow complete, got output")
            self.input = None

    @workflow.update
    async def process_update(self, input: str):
        self.input = input
        self.output = None
        logger.info("waiting for output")
        await workflow.wait_condition(
            lambda: self.output is not None
        )
        return self.output

class Activities:
    @activity.defn
    async def child_activity(
        self,
        name: str
    ) -> str:
        return f"child activity {name}"

@workflow.defn
class ChildWorkflow:
    @workflow.run
    async def run(self, name: str):
        return await workflow.execute_activity_method(
            Activities.child_activity,
            args=[name],
            schedule_to_close_timeout=timedelta(
                seconds=60
            ),
            task_queue=workflow.info().task_queue,
            retry_policy=RetryPolicy(maximum_attempts=1)
       )

@workflow.defn(name="ChildWorkflow")
class MockChildWorkflow:
    @workflow.run
    async def run(self, name: str):
        await asyncio.sleep(60)
        return f"mock {name} done"

pytestmark = pytest.mark.asyncio

@pytest.mark.skip(
    reason="test will get stuck"
)
async def test_workflow_update():
    """ Test to show that if we use workflow update together with child workflow execution we'll trigger the bug. This test will get stuck """
    tmp_task_queue = f"task_queue-{uuid.uuid4()}"
    activities = Activities()
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue=tmp_task_queue,
            workflows=[ParentWorkflow, MockChildWorkflow],
            activities=[activities.child_activity],
        ):
            # env is WorkflowEnvironment with time-skipping
            handle = await env.client.start_workflow(
                ParentWorkflow.run,
                args=[],
                task_queue=tmp_task_queue,
                id=str(uuid.uuid4())
            )
            logger.info("executing update")
            update_result = await handle.execute_update(
                ParentWorkflow.process_update,
                args=["my name"]
            )
            logger.info(f"got result: {update_result}, waiting for termination")
            # do some other verification with workflow handle
            # wait for workflow to finish
            result = await handle.result()
            print(result)

async def test_workflow_signal():
    """ Test to show that if we are NOT sending workflow update, the bug is not triggered. This test will succeed """
    tmp_task_queue = f"task_queue-{uuid.uuid4()}"
    activities = Activities()
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue=tmp_task_queue,
            workflows=[ParentWorkflow, MockChildWorkflow],
            activities=[activities.child_activity],
        ):
            # env is WorkflowEnvironment with time-skipping
            handle = await env.client.start_workflow(
                ParentWorkflow.run,
                args=[],
                task_queue=tmp_task_queue,
                id=str(uuid.uuid4())
            )
            logger.info("executing update")
            update_result = await handle.signal(
                ParentWorkflow.signal,
                args=["my name"]
            )
            logger.info(f"got result: {update_result}, waiting for termination")
            # do some other verification with workflow handle
            # wait for workflow to finish
            result = await handle.result()
            print(result)

async def test_childless_workflow():
    """ Test to show that if we use workflow update and there is no child workflow, the bug is not triggered. this test will succeed """
    tmp_task_queue = f"task_queue-{uuid.uuid4()}"
    activities = Activities()
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue=tmp_task_queue,
            workflows=[ChildlessWorkflow],
            activities=[activities.child_activity],
        ):
            # env is WorkflowEnvironment with time-skipping
            handle = await env.client.start_workflow(
                ChildlessWorkflow.run,
                args=[],
                task_queue=tmp_task_queue,
                id=str(uuid.uuid4())
            )
            logger.info("executing update")
            update_result = await handle.execute_update(
                ChildlessWorkflow.process_update,
                args=["my name"]
            )
            logger.info(f"got result: {update_result}, waiting for termination")
            # do some other verification with workflow handle
            # wait for workflow to finish
            result = await handle.result()
            print(result)

Command to run this:

poetry run pytest -rA -s test_mock_child.py

Environment/Versions

Additional context