hatchet-dev / hatchet

A distributed, fault-tolerant task queue
https://hatchet.run
MIT License
4.22k stars 157 forks source link

Duplicate Task Execution in Hatchet Worker on Timeout #1022

Open gustavobittencourt opened 1 week ago

gustavobittencourt commented 1 week ago

Description: I am experiencing duplicate task executions in my Hatchet worker whenever a timeout occurs on a task. As part of my testing, I intentionally set low timeouts for tasks in my workflow to observe how Hatchet handles retries. However, I’ve observed that each timeout leads to duplicate executions, even after the task has moved on to the next step or completed successfully.

Expected Behavior: When a task times out and a retry is triggered, only the new attempt should execute. Any previous instance of the timed-out task should be canceled to prevent duplicate runs.

Actual Behavior: When a timeout occurs, Hatchet retries the task as expected, but the worker ends up running duplicate instances of the same step. Below is an example from my logs showing timestamps that illustrate duplicate executions:

Example Logs:

17:41:52.014 - Step 1 (bd98a): It's gonna wait for 7 seconds...
17:41:56.011 - Step 1 (4fc5f): It's gonna wait for 1 seconds...
17:41:57.012 - Step 1 (4fc5f): Going to next step.
17:41:58.013 - Step 2 (4fc5f): Step 1 time was 1 second...
17:41:58.013 - Step 2 (4fc5f): It's gonna wait for 7 seconds...
17:41:59.014 - Step 1 (bd98a): Going to the next step.
17:42:02.013 - Step 2 (4fc5f): Step 1 time was 1 second...
17:42:02.013 - Step 2 (4fc5f): It's gonna wait for 9 seconds...
17:42:05.013 - Step 2 (4fc5f): Success!
17:42:11.013 - Step 2 (4fc5f): Success!

In this example:

Worker Code for Reproduction:

import asyncio
import random
import uuid
from datetime import datetime

from hatchet_sdk import Context
from src.hatchet import hatchet

def timestamp():
    return datetime.now().strftime("%H:%M:%S.%f")[:-3]

@hatchet.workflow(on_events=["create_timeout"])
class TimeoutWorkflow:
    @hatchet.step(name="Step 1", timeout="3s", retries=10)
    async def step_1(self, context: Context):
        my_id = str(uuid.uuid4())[:5]
        time = random.randint(1, 10)
        print(
            f"{timestamp()} - Step 1 ({my_id}): It's gonna wait for {time} seconds..."
        )

        await asyncio.sleep(time)

        print(f"{timestamp()} - Step 1 ({my_id}): Going to the next step.")

        return {"my_id": my_id, "time": time}

    @hatchet.step(name="Step 2", parents=["step_1"], timeout="3s", retries=10)
    async def step_2(self, context: Context):
        time_1 = context.step_output("step_1")["time"]
        my_id = context.step_output("step_1")["my_id"]
        print(f"{timestamp()} - Step 2 ({my_id}): Step 1 time was {time_1} seconds...")

        time = random.randint(1, 10)
        print(
            f"{timestamp()} - Step 2 ({my_id}): It's gonna wait for {time} seconds..."
        )

        await asyncio.sleep(time)

        print(f"{timestamp()} - Step 2 ({my_id}): Success!")

Configuration Details:

Steps to Reproduce:

  1. Set up a Hatchet worker with retries and low timeout thresholds.
  2. Trigger the workflow, intentionally causing a timeout to observe the retry behavior.
  3. Monitor the workflow run logs to observe duplicate task executions.

Questions:

Any insights or guidance on how to resolve this would be greatly appreciated! Thank you for your help.

grutt commented 1 week ago

Hi @gustavobittencourt thanks for the detailed report you seem to be 12 minor versions behind the latest sdk, v0.38.3 is the latest stable release.

Would you be able to try upgrading and report back if you're seeing the same behavior.

Also, are you running on Hatchet Cloud or Self Hosted? If Self Hosted, what version are you running?

gustavobittencourt commented 1 week ago

Hi @grutt, thanks for your response!

I just upgraded the hatchet_sdk to version 0.38.3 as suggested, and here’s my updated pyproject.toml:

[tool.poetry]
name = "src"
version = "0.0.0"
description = "Easily run background tasks in Python with Hatchet"
authors = []
readme = "README.md"

[tool.poetry.scripts]
hatchet = "src.main:start"

[tool.poetry.dependencies]
python = "^3.10"
hatchet-sdk = "^0.38.3"
python-dotenv = "^1.0.1"
openai = "^1.54.2"
beautifulsoup4 = "^4.12.3"
requests = "^2.32.3"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

I am running Hatchet locally based on the docker-compose.yml example from the Hatchet Self-Hosted documentation. I made a few changes, like adjusting environment variables for Postgres and rabbitmq passwords, setting SERVER_GRPC_BROADCAST_ADDRESS: localhost:7077, and adding watchtower to monitor container updates. All containers are set to the :latest tag, which currently points to version v0.50.4.

I verified the Hatchet component versions running in the containers:

/hatchet # ./hatchet-engine --version
v0.50.4
/hatchet # ./hatchet-api --version
v0.50.4

However, even after the upgrade, the issue with duplicate task executions persists. The logs still show duplicate executions of steps, as illustrated in the example log snippet below, which shows the behavior observed after a timeout:

python src/main.py
[INFO]  πŸͺ“ -- 2024-11-06 16:03:47,638 - ------------------------------------------
[INFO]  πŸͺ“ -- 2024-11-06 16:03:47,638 - STARTING HATCHET...
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:47,638 - worker runtime starting on PID: 17873
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:47,638 - using existing event loop
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:47,640 - action listener starting on PID: 17875
[INFO]  πŸͺ“ -- 2024-11-06 16:03:47,644 - starting runner...
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:47,644 - starting action listener health check...
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:47,645 - 'timeout-worker' waiting for ['timeoutworkflow:step_1', 'timeoutworkflow:step_2']
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:48,439 - starting action listener: timeout-worker
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:48,447 - acquired action listener: 2f4c3af2-1a22-4ed0-9c19-207fb813a3d3
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:48,447 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:52,450 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:03:56,453 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:00,455 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:04,458 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:08,460 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:12,463 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:16,466 - sending heartbeat
[INFO]  πŸͺ“ -- 2024-11-06 16:04:17,383 - rx: start step run: 257bc875-e983-4943-bd29-fec9eea1c818/timeoutworkflow:step_1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:17,383 - tx: event: timeoutworkflow:step_1/1
[INFO]  πŸͺ“ -- 2024-11-06 16:04:17,384 - run: start step: timeoutworkflow:step_1/257bc875-e983-4943-bd29-fec9eea1c818
16:04:17.384611 - Step 1 (2de03): It's gonna wait for 10 seconds...
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:17,384 - tx: event: timeoutworkflow:step_1/1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:17,384 - start time: 0.0007128715515136719
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:20,468 - sending heartbeat
[INFO]  πŸͺ“ -- 2024-11-06 16:04:22,379 - rx: start step run: 257bc875-e983-4943-bd29-fec9eea1c818/timeoutworkflow:step_1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:22,379 - tx: event: timeoutworkflow:step_1/1
[INFO]  πŸͺ“ -- 2024-11-06 16:04:22,379 - run: start step: timeoutworkflow:step_1/257bc875-e983-4943-bd29-fec9eea1c818
16:04:22.380169 - Step 1 (cf131): It's gonna wait for 8 seconds...
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:22,380 - tx: event: timeoutworkflow:step_1/1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:22,380 - start time: 0.00037407875061035156
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:24,471 - sending heartbeat
[INFO]  πŸͺ“ -- 2024-11-06 16:04:26,378 - rx: start step run: 257bc875-e983-4943-bd29-fec9eea1c818/timeoutworkflow:step_1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:26,378 - tx: event: timeoutworkflow:step_1/1
[INFO]  πŸͺ“ -- 2024-11-06 16:04:26,379 - run: start step: timeoutworkflow:step_1/257bc875-e983-4943-bd29-fec9eea1c818
16:04:26.379536 - Step 1 (2699b): It's gonna wait for 4 seconds...
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:26,379 - tx: event: timeoutworkflow:step_1/1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:26,379 - start time: 0.0005140304565429688
16:04:27.385065 - Step 1 (2de03): Going to next step.
[INFO]  πŸͺ“ -- 2024-11-06 16:04:27,385 - finished step run: timeoutworkflow:step_1/257bc875-e983-4943-bd29-fec9eea1c818
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:27,385 - tx: event: timeoutworkflow:step_1/2
[INFO]  πŸͺ“ -- 2024-11-06 16:04:28,379 - rx: start step run: 23942a5c-b089-4ded-9525-7d31cc4a5f9a/timeoutworkflow:step_2
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:28,380 - tx: event: timeoutworkflow:step_2/1
[INFO]  πŸͺ“ -- 2024-11-06 16:04:28,380 - run: start step: timeoutworkflow:step_2/23942a5c-b089-4ded-9525-7d31cc4a5f9a
16:04:28.380428 - Step 2 (2de03): Step 1 time was 10 seconds...
16:04:28.380465 - Step 2 (2de03): It's gonna wait for 5 seconds...
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:28,380 - tx: event: timeoutworkflow:step_2/1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:28,380 - start time: 0.0004401206970214844
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:28,474 - sending heartbeat
16:04:30.379875 - Step 1 (2699b): Going to next step.
16:04:30.379918 - Step 1 (cf131): Going to next step.
[INFO]  πŸͺ“ -- 2024-11-06 16:04:30,379 - finished step run: timeoutworkflow:step_1/257bc875-e983-4943-bd29-fec9eea1c818
[INFO]  πŸͺ“ -- 2024-11-06 16:04:30,380 - finished step run: timeoutworkflow:step_1/257bc875-e983-4943-bd29-fec9eea1c818
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:30,380 - tx: event: timeoutworkflow:step_1/2
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:30,380 - tx: event: timeoutworkflow:step_1/2
[INFO]  πŸͺ“ -- 2024-11-06 16:04:32,377 - rx: start step run: 23942a5c-b089-4ded-9525-7d31cc4a5f9a/timeoutworkflow:step_2
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:32,377 - tx: event: timeoutworkflow:step_2/1
[INFO]  πŸͺ“ -- 2024-11-06 16:04:32,377 - run: start step: timeoutworkflow:step_2/23942a5c-b089-4ded-9525-7d31cc4a5f9a
16:04:32.377829 - Step 2 (2de03): Step 1 time was 10 seconds...
16:04:32.377877 - Step 2 (2de03): It's gonna wait for 6 seconds...
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:32,378 - tx: event: timeoutworkflow:step_2/1
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:32,378 - start time: 0.0004818439483642578
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:32,476 - sending heartbeat
16:04:33.380391 - Step 2 (2de03): Success!
[INFO]  πŸͺ“ -- 2024-11-06 16:04:33,380 - finished step run: timeoutworkflow:step_2/23942a5c-b089-4ded-9525-7d31cc4a5f9a
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:33,380 - tx: event: timeoutworkflow:step_2/2
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:36,479 - sending heartbeat
16:04:38.377678 - Step 2 (2de03): Success!
[INFO]  πŸͺ“ -- 2024-11-06 16:04:38,377 - finished step run: timeoutworkflow:step_2/23942a5c-b089-4ded-9525-7d31cc4a5f9a
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:38,378 - tx: event: timeoutworkflow:step_2/2
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:40,481 - sending heartbeat
[DEBUG] πŸͺ“ -- 2024-11-06 16:04:44,484 - sending heartbeat

Here is the workflow run screenshot on Hatchet dashboard:

image

Please let me know if there are any additional recommendations or configurations to adjust.

Thank you!