temporalio / sdk-python

Temporal Python SDK
MIT License
460 stars 68 forks source link

[Bug] NondeterminismError when replaying overlapping signals and updates #673

Open cbrewster opened 3 days ago

cbrewster commented 3 days ago

What are you really trying to do?

I have a workflow which runs various activities depending on signals/updates from the user. The workflow keeps track of the current state (running, stopped) using a counter to keep track of how many activities are currently running. There is a resume update which we only want to start an activity if there isn't already an activity running.

I've implemented this but during replays we are seeing non-determinism errors. I've made a minimal reproduction of this.

Describe the bug

We're seeing workflow replays that cause events to be processed in a different order than how the events originally came in:

Original order of events:

- start workflow
- start timer
- signal start
- start activity 1 (state = "running", running_count = 1)
- activity 1 finishes (state = "stopped", running_count = 0)
- update resume
- start activity 2 (state = "running", running_count = 1)
- activity 2 finishes (state = "stopped", running_count = 0)
- update returns
- timer completes, workflow done

During replay we get a different order which causes the NDE:

- start workflow
- start timer
- signal start
- start activity 1 (state = "running", running_count = 1)
- update resume <--- This update gets ran before we process the completion of activity 1
- resume returns early because state is "running"
- activity 1 finishes (state = "stopped", running_count = 0)
- NDE because we didn't end up scheduling activity 2

Minimal Reproduction

This reproduction runs the workflow and then automatically replays the workflow to demonstrate the nondeterminism error.

import asyncio
from datetime import timedelta
from uuid import uuid4

from temporalio import workflow
from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Replayer, Worker

@activity.defn
async def test_activity() -> None:
    return

@workflow.defn
class TestWorkflow:
    def __init__(self) -> None:
        self.running_count = 0
        self.state = "stopped"

    @workflow.run
    async def run(self) -> None:
        await asyncio.sleep(1)

    @workflow.signal
    async def start(self) -> None:
        await self.run_one()

    @workflow.update
    async def resume(self) -> None:
        if self.state == "running":
            return
        await self.run_one()

    async def run_one(self):
        self.running_count += 1
        self.state = "running"
        await workflow.execute_activity(
            test_activity,
            start_to_close_timeout=timedelta(seconds=1),
        )
        self.running_count -= 1
        if self.running_count == 0:
            self.state = "stopped"

async def main() -> None:
    id = str(uuid4())
    client = await Client.connect("localhost:7233")
    async with Worker(
        client=client,
        task_queue="test",
        workflows=[TestWorkflow],
        activities=[test_activity],
    ):
        workflow_handle = await client.start_workflow(
            TestWorkflow.run,
            id=id,
            task_queue="test",
        )
        await workflow_handle.signal(TestWorkflow.start)
        await asyncio.sleep(0.5)
        await workflow_handle.execute_update(TestWorkflow.resume)
        await workflow_handle.result()

    workflows = client.list_workflows(f"WorkflowId = '{id}'")
    histories = workflows.map_histories()
    replayer = Replayer(workflows=[TestWorkflow])
    await replayer.replay_workflows(histories)

if __name__ == "__main__":
    asyncio.run(main())

Environment/Versions

Additional Comments

Replacing the update with a signal instead does not have the same issue.

cretz commented 3 days ago

Thanks for the detailed report! The activity completion definitely should be delivered before update on replay if it was originally. We will likely treat this as a high priority (granted updates are not yet GA for these kinds of reasons).