temporalio / sdk-python

Temporal Python SDK
MIT License
474 stars 77 forks source link

[Bug] asyncio.wait is non-deterministic when used with coroutines instead of tasks #429

Closed ntessman-capsule closed 6 months ago

ntessman-capsule commented 1 year ago

What are you really trying to do?

Split a list of incoming events into concurrently executed activity streams. The repro below mimics a stripped down version of the original code structure.

Describe the bug

When using asyncio.wait() with a list of async method calls, which contain a series of activity executions, an error can occur which results in later activities receiving the return values of different concurrent executions.

Here is a table to illustrate what I mean. Given an activity that executes f(x) -> x, if we pass the numbers 1-3 concurrently, we might see this:

Activity Input Expected Output Actual Output
1 1 1
2 2 3
3 3 2

This issue is random, and rare. The reproduction below is designed to maximize the chance of running into it.

Minimal Reproduction

Below is a self-contained reproduction of the issue. Using this input should encounter the error fairly consistently, a little less than once per run:

{"execution_iterations": 20,"activity_iterations": 5,"concurrency": 100,"wait_time": 0.1}

The temporal setup I used to reproduce this issue is a vanilla Temporal server install running via temporal server start-dev, with two worker instances running the below python file using python -m main. I primarily tested using Python 3.8.

When there is a mismatch, it is printed to the console.

import asyncio
from datetime import timedelta
from typing import List
from temporalio import workflow, activity
from temporalio.worker import Worker
from temporalio.client import Client

from dataclasses import dataclass

@dataclass
class EchoRequest:
    execution_iteration: int
    activity_iteration: int
    input: str
    wait_time: float

@dataclass
class EchoResponse:
    output: str

@activity.defn(name="echo")
async def echo(
    echo: EchoRequest
) -> EchoResponse:
    await asyncio.sleep(echo.wait_time)
    return EchoResponse(output=echo.input)

@dataclass
class RaceConditionIteration:
    execution_id: int
    iterations: int
    wait_time: float

@dataclass
class RaceConditionTestInput:
    execution_iterations: int
    activity_iterations: int
    concurrency: int
    wait_time: float

@activity.defn(name="race_condition_get_input_events")
async def race_condition_get_input_events(
    input: RaceConditionTestInput
) -> List[RaceConditionIteration]:
    return [
        RaceConditionIteration(
            execution_id=id,
            iterations=input.activity_iterations,
            wait_time=input.wait_time,
        )
        for id in range(input.concurrency)
    ]

@workflow.defn(name="RaceConditionTestWorkflow")
class RaceConditionTestWorkflow:
    @workflow.run
    async def run(self, event: RaceConditionTestInput) -> str:
        print('Starting workflow...')

        # Simulate a list of input events
        generated_input_data = await workflow.execute_activity(
            race_condition_get_input_events,
            event,
            start_to_close_timeout=timedelta(seconds=10),
            schedule_to_close_timeout=timedelta(seconds=60)
        )

        for iteration in range(event.execution_iterations):
            _, _ = await asyncio.wait(
                [
                    self.execute_activities(
                        iteration=iteration,
                        input=input
                    )
                    for input in generated_input_data
                ],
                return_when=asyncio.ALL_COMPLETED
            )

        return "Done."

    async def execute_activities(
        self,
        iteration: int,
        input: RaceConditionIteration,
    ) -> None:
        for iter in range(input.iterations):
            iter_id = f"{iteration}.{input.execution_id}.{iter}"
            result = await workflow.execute_activity(
                echo,
                EchoRequest(
                    execution_iteration=iteration,
                    activity_iteration=iter,
                    input=iter_id,
                    wait_time=input.wait_time
                ),
                start_to_close_timeout=timedelta(seconds=10),
                schedule_to_close_timeout=timedelta(seconds=60)
            )

            if iter_id != result.output:
                print(f"Expected: {iter_id}, Actual: {result.output}")

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="race-condition",
        workflows=[RaceConditionTestWorkflow],
        activities=[echo, race_condition_get_input_events],
    )

    print('Starting worker...')

    await worker.run()

if __name__ == '__main__':
    asyncio.run(main())

Environment/Versions

Additional context

It appears that this bug occurs after this warning is printed to the worker console:

2023-11-15T05:38:09.387580Z  WARN temporal_sdk_core::worker::workflow: Task not found when completing error=status: NotFound, message: "Workflow task not found.", details: [8, 5, 18, 24, 87, 111, 114, 107, 102, 108, 111, 119, 32, 116, 97, 115, 107, 32, 110, 111, 116, 32, 102, 111, 117, 110, 100, 46, 26, 66, 10, 64, 116, 121, 112, 101, 46, 103, 111, 111, 103, 108, 101, 97, 112, 105, 115, 46, 99, 111, 109, 47, 116, 101, 109, 112, 111, 114, 97, 108, 46, 97, 112, 105, 46, 101, 114, 114, 111, 114, 100, 101, 116, 97, 105, 108, 115, 46, 118, 49, 46, 78, 111, 116, 70, 111, 117, 110, 100, 70, 97, 105, 108, 117, 114, 101], metadata: MetadataMap { headers: {"content-type": "application/grpc"} } run_id="eb9b8aed-c730-4fe6-a7e5-a772f6757be2"

I can't confirm whether this warning was also appearing when this issue was happening in the real code. It also appears to result in the workflow task restarting, which seems correlated with the determinism breakdown.

Additionally, it appears that this issue has indirectly been mitigated in 3.11, as asyncio.wait() no longer allows passing coroutines directly. When the method is wrapped in asyncio.create_task(), this issue disappears. The above warning is still printed, but doesn't result in disordered activity results.

This code change in the workflow removes the issue:


_, _ = await asyncio.wait(
    [
-      self.execute_activities(
+      asyncio.create_task(self.execute_activities(
            iteration=iteration,
            input=input
-       )
+       ))
        for input in generated_input_data
    ],
    return_when=asyncio.ALL_COMPLETED
)
HillaShx commented 1 year ago

I'm facing this same issue, it seems, by the logs. I, too, use asyncio for concurrent activity execution (by multiple workers separated by threads).

Environment/Versions OS and processor: M2 Mac, MacOS Ventura 13.5.1 Temporal SDK Versions: 1.3.0 Python: 3.9.6 Occurs within Temporal Cluster service deployment, with the workers deployed to an Azure Web App. LMK what other context might help in resolving this.

cretz commented 1 year ago

Hrmm, I was only able to get the mismatch after replay (replay was being forced due to overgrowing history size I think). I am still investigating here...

cretz commented 1 year ago

Turns out asyncio.wait is non-deterministic. Here is the problem: https://github.com/python/cpython/blob/967f2a3052c2d22e31564b428a9aa8cc63dc2a9f/Lib/asyncio/tasks.py#L443

This is not a Temporal-specific issue. What is the output of the following?

    async def do_something(name: str) -> None:
        print(f"Run {name}")
    await asyncio.wait([
        do_something("something1"),
        do_something("something2"),
        do_something("something3"),
    ])
    await asyncio.wait([
        do_something("something4"),
        do_something("something5"),
        do_something("something6"),
    ])

In 3.10, ignoring deprecation warnings, it may be something like (but it can change with only slight code alterations):

Run something1
Run something2
Run something3
Run something6
Run something4
Run something5

Why was that output not predictable?

Unfortunately, Python in asyncio.wait non-deterministically converts the input to a set (which unlike even dict, does not have any reliable ordering). This means the coroutines are not even started in the same order. This problem does not exist with start_activity because that is a sync function that returns an awaitable (so it is called when you invoke it not when awaited), but execute_activity is an async function (so it is called when it's awaited not when it's invoked). This means that foo = workflow.execute_activity(...) does nothing until await foo is run.

The reason this works with create_task is because create_task starts the coroutine immediately when create_task is run, so the coroutines run in a deterministic order.

So, basically we need to educate people that asyncio.wait is non-deterministic when used with coroutines instead of tasks (which isn't allowed in 3.11+). I was originally going to open a CPython issue to stop converting to a set (digging through git blame, this was done way back in at least 3.4), but that would only apply to new versions (doubt they'd cherry pick this), so you'd be forced to use tasks anyways.

Does this make sense? I will add a README note/warning. We could offer our own deterministic form of this function, but we'd probably require tasks anyways like Python does now. Any alternative suggestions or ways we can approach this?

ntessman-capsule commented 10 months ago

Thanks for your investigation @cretz. I'll leave this open for comment in case anyone has a better idea, but for my purposes this issue is resolved through the use of tasks. Personally, I think the only thing that can be done is provide a warning. It's a gotcha that is on Python's shoulders, not the SDK, and as you said it's unlikely that a fix will be applied retroactively.