PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.68k stars 1.65k forks source link

Prefect Retry Task - Not retrying from point of failure #16059

Open cleung625 opened 2 weeks ago

cleung625 commented 2 weeks ago

Bug summary

Introduction

Hello! We ran into a weird issue with production lately where code is not being rerun from the point of failure whenever we retry a failed deployment run. When doing debugging and testing, I found two different behaviors with the retries

Set Up

Code

Here is some code to get started and set up on the bug:

Behavior 1 - Using Persist_Result = True, Restarts from task A Regardless

import asyncio

from prefect import flow, get_run_logger, task

@task(persist_result=True)
async def task_A():
    return 1

@task(persist_result=True)
async def task_B():
    return 2

@task(persist_result=True)
async def task_C():
    raise ValueError("This is an error")
    return

@task(persist_result=True)
async def task_D():
    return 4

@flow(persist_result=True)
async def test_retry_python(var_a, var_b):
    logger = get_run_logger()
    logger.info(f"Now Running: {var_a} and {var_b}")

    a = await task_A.submit()
    b = await task_B.submit(wait_for=[a])
    c = await task_C.submit(wait_for=[b])
    d = await task_C.submit(wait_for=[c])

    logger.info(f"Result: {d}")

    return d

if __name__ == "__main__":
    asyncio.run(test_retry_python(var_a=2, var_b=3))

Result - Behavior 1

Behavior 2 - Did not use persist_results, but is persisting by default, and is picking up on last retry point. However, it only does this if the worker is on the same machine it initially ran its first attempt on.

Code - Behavior 2

import asyncio
import os
import time
from typing import Literal

from prefect import flow, get_run_logger
from prefect.logging import get_run_logger
from prefect.runtime import flow_run
from prefect_libs import ccl_handling, ccl_kdb, ccl_kdb_setup

DROP_DOWN = Literal["1", "2", "3", "4"]

@flow(
    flow_run_name="test_rery" + time.strftime("%Y-%m-%d"),
    on_failure=[ccl_handling.hook],  # Replace with actual error hook
    log_prints=True,
)
async def task_retry(
    env,
    start: DROP_DOWN = "1",
    stop: DROP_DOWN = "4",
    email_recipients=None,
):
    logger = get_run_logger()
    current_time = time.ctime()

    logger.info(f"Now running process: {flow_run.name}")
    logger.info(f"Current time: {current_time!s}")
    logger.info(f"Environment: {env}")
    logger.info(f"User: {os.getlogin()}")

    hmaster_port_number = await ccl_kdb_setup.grab_hmaster_port(env, "access")
    conn = await ccl_kdb_setup.grab_port_handle.submit(env, hmaster_port_number)

    # Task order

    tasks_order = ["1", "2", "3", "4"]

    # Get the subset of tasks to run using the helper function
    tasks_to_run = ccl_kdb.get_tasks_to_run(tasks_order, start, stop)

    # Initialize an empty dictionary to store task results
    task_functions = {}

    # Use the helper function to run the tasks
    # Task 1: Wait for handles
    await ccl_kdb.run_task_with_start_stop(
        task_name="1",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="1",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        tags=["test"],
    )

    # Task 1: Wait for handles
    await ccl_kdb.run_task_with_start_stop(
        task_name="2",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="2",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        wait_for_tasks=["1"],
        tags=["test"],
    )

    await ccl_kdb.run_task_with_start_stop(
        task_name="3",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="'I am an error",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        retries=5,
        retry_delay_seconds=10,
        tags=["test"],
        wait_for_tasks=["2"],
    )

    await ccl_kdb.run_task_with_start_stop(
        task_name="4",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="4",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        tags=["test"],
        wait_for_tasks=["3"],
    )

if __name__ == "__main__":
    asyncio.run(
        task_retry(
            env="qprod",
            start="1",
            stop="4",
            email_recipients=["cleung@cclgroup.com", "aliceroberts@cclgroup.com"],
        )
    )

Result - Behavior 2

image

Conclusion

It seems like there is variable behavior with retries. Am I not using this correctly? Or is there something else going on? I tried filing multiple times on Slack but it seems like there was no response to this. Thanks!

Version info

(C:\prefect-venv) C:\Users\qtask>prefect version
Version:             2.18.3
API version:         0.8.4
Python version:      3.9.19
Git commit:          c449aee8
Built:               Thu, May 2, 2024 5:47 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server

Additional context

No response

cleung625 commented 1 day ago

Hey Team, any updates on this? This affects are capability to scale with different machines