temporalio / sdk-core

Core Temporal SDK that can be used as a base for language specific Temporal SDKs
MIT License
262 stars 70 forks source link

[Bug] Legacy queries may get lost #716

Closed cretz closed 3 months ago

cretz commented 4 months ago

Describe the bug

There is a rare-but-acknowledged server case where legacy query is delivered for a run ID to a worker but before it is answered a new task can be delivered. After lots of debugging, it appears https://github.com/temporalio/sdk-core/blob/409e74ec8e80ae4c1f9043e8b413b1371b65f946/core/src/worker/workflow/workflow_stream.rs#L468 overwrites it.

Minimal Reproduction

So this is kinda hard to replicate. I have seen the following Python script replicate it after running for a short bit:

import asyncio
import logging
from datetime import timedelta
from uuid import uuid4

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

@activity.defn
async def wait_forever_activity() -> None:
    await asyncio.Future()

@workflow.defn
class WaitForeverWorkflow:
    @workflow.run
    async def run(self) -> None:
        await asyncio.Future()

@workflow.defn
class WeirdWorkflow:
    def __init__(self) -> None:
        self._signal_count = 0

    @workflow.run
    async def run(self) -> None:
        # Start several things in background
        tasks = [
            asyncio.create_task(
                workflow.execute_activity(
                    wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
                )
            ),
            asyncio.create_task(
                workflow.execute_child_workflow(WaitForeverWorkflow.run)
            ),
            asyncio.create_task(asyncio.sleep(1000)),
            asyncio.shield(
                workflow.execute_activity(
                    wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
                )
            ),
            asyncio.create_task(workflow.wait_condition(lambda: False)),
        ]
        asyncio.gather(*tasks, return_exceptions=True)
        try:
            # Wait for signal count to reach 2
            await workflow.wait_condition(lambda: self._signal_count > 1)
        finally:
            # Wait for signal count to reach 3
            await workflow.wait_condition(lambda: self._signal_count > 2)

    @workflow.signal
    async def signal(self) -> None:
        self._signal_count += 1

    @workflow.query
    def signal_count(self) -> int:
        return self._signal_count

task_queue = f"tq-{uuid4()}"

async def run_workflow(client: Client) -> None:
    id = f"wf-{uuid4()}"
    print("Running workflow", id)
    handle = await client.start_workflow(
        WeirdWorkflow.run,
        id=id,
        task_queue=task_queue,
    )

    async def wait_for_signal_count(expected: int) -> None:
        # Just try 20 times every 100ms to get the signal count
        for _ in range(20):
            count = await handle.query(WeirdWorkflow.signal_count)
            if count == expected:
                return
            await asyncio.sleep(0.1)

    try:
        # Confirm signal count as 0
        await wait_for_signal_count(0)

        # Send signal and confirm it's at 1
        await handle.signal(WeirdWorkflow.signal)
        await wait_for_signal_count(1)

        await handle.signal(WeirdWorkflow.signal)
        await wait_for_signal_count(2)

        await handle.signal(WeirdWorkflow.signal)
        await wait_for_signal_count(3)

        await handle.result()
    except Exception as err:
        raise RuntimeError(f"Failed on workflow {id}") from err

async def main():
    # Uncomment the line below to see logging
    logging.basicConfig(level=logging.INFO)
    logging.info(f"Starting on task queue {task_queue}")

    # Start client
    client = await Client.connect("localhost:7233")

    # Run a worker for the workflow
    async with Worker(
        client,
        task_queue=task_queue,
        workflows=[WeirdWorkflow, WaitForeverWorkflow],
        activities=[wait_forever_activity],
        # Disable cache
        max_cached_workflows=0,
    ):
        # We're gonna run N at a time forever until we get a query timeout
        while True:
            await asyncio.gather(*[run_workflow(client) for _ in range(10)])

if __name__ == "__main__":
    asyncio.run(main())

I've found it is much more likely on a CPU-overloaded machine because the legacy query has to not get a chance to make it to the worker before the next task comes in. This is happening quite a bit in Python CI ever since we introduced our test_cache_eviction_tear_down test (like once ever few runs).