temporalio / sdk-python

Temporal Python SDK
MIT License
472 stars 75 forks source link

MultiProcess executor not working as expected #654

Closed giuliohome closed 1 month ago

giuliohome commented 1 month ago

What are you really trying to do?

This is a follow up to this message, in particular I'm simply running this official sample https://github.com/temporalio/samples-python/blob/main/hello/hello_activity_multiprocess.py

Describe the bug

I see this output

(.venv) C:\Users\giulio365cloud\dev-test>python multiprocessor_giulio.py
Heartbeating activity on PID 5104
Heartbeating activity on PID 5104
Heartbeating activity on PID 5104
Heartbeating activity on PID 5104
Heartbeating activity on PID 5104
Heartbeating activity on PID 5104
Result on PID 7072: Hello, World from activity on PID 5104! and then Hello, World from activity on PID 5104!

I would have expected to see a different PID when the activity runs the second time!

Minimal Reproduction

I have taken the official sample and simply repeated the activity execution two times inside the workflow

@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        ret1 = await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
            # Always set a heartbeat timeout for long-running activities
            heartbeat_timeout=timedelta(seconds=2),
        )
        ret2 = await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
            # Always set a heartbeat timeout for long-running activities
            heartbeat_timeout=timedelta(seconds=2),
        )
        return ret1 + " and then " + ret2

and I have also modified the activity to print its PID in the hello message

@activity.defn
def compose_greeting(input: ComposeGreetingInput) -> str:
    # We'll wait for 3 seconds, heartbeating in between (like all long-running
    # activities should do), then return the greeting
    for _ in range(0, 3):
        print(f"Heartbeating activity on PID {os.getpid()}")
        activity.heartbeat()
        time.sleep(1)
    return f"{input.greeting}, {input.name} from activity on PID {os.getpid()}!"

Environment/Versions

image

giuliohome commented 1 month ago

Uhm ... I see... they need to run in parallel

@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        task1 = asyncio.create_task( workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
            # Always set a heartbeat timeout for long-running activities
            heartbeat_timeout=timedelta(seconds=2),
        ))
        task2 = asyncio.create_task( workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
            # Always set a heartbeat timeout for long-running activities
            heartbeat_timeout=timedelta(seconds=2),
        ) )
        ret1 = await task1
        ret2 = await task2
        return ret1 + " and then " + ret2

in that case

python multiprocessor_giulio.py
Heartbeating activity on PID 4952
Heartbeating activity on PID 5904
Heartbeating activity on PID 4952
Heartbeating activity on PID 5904
Heartbeating activity on PID 4952
Heartbeating activity on PID 5904
Result on PID 11484: Hello, World from activity on PID 4952! and then Hello, World from activity on PID 5904!