PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.79k stars 1.55k forks source link

surpassing PREFECT_RUNNER_PROCESS_LIMIT crashes the `serve()` process #11093

Closed diego-lima closed 9 months ago

diego-lima commented 10 months ago

First check

Bug summary

There's a process limit setting (PREFECT_RUNNER_PROCESS_LIMIT) that doesn't allow more than a certain number of subprocesses running at a time.

Currently, it's easy to reach this limit while trying out the official Tasks tutorial and when this happens, the python process crashes with RuntimeError: this borrower is already holding one of this CapacityLimiter's tokens.

This is bad developer experience because I had to keep paying attention to my script and restarting it when it died. As a workaround I wrapped the serve() call in an infinite loop.

I could increase the process limit, but there's no guarantee I'll never reach it so I'd have to keep checking if my script hasn't died. If I run into that limit at any point, I think I shouldn't have to restart my script.

Apparently the default value is 5 (ref: slack) and I ran into this issue by running the tutorial example (snippet attached) with an input I knew would error (e.g: repo_name="asasdasd") and set retry to some number (e.g. 5)

Reproduction

import httpx
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: dict = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()

@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    issues = []
    pages = range(1, -(open_issues_count // -per_page) + 1)
    for page in pages:
        issues.append(
            get_url.submit(
                f"https://api.github.com/repos/{repo_name}/issues",
                params={"page": page, "per_page": per_page, "state": "open"},
            )
        )
    return [i for p in issues for i in p.result()]

@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    repo_stats = get_url(f"https://api.github.com/repos/{repo_name}")
    issues = get_open_issues(repo_name, repo_stats["open_issues_count"])
    issues_per_user = len(issues) / len(set([i["user"]["id"] for i in issues]))
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
    print(f"Forksssza 🍴 : {repo_stats['forks_count']}")
    print(f"Average open issues per user 💌 : {issues_per_user:.2f}")

if __name__ == "__main__":
    while True:
        try:
            get_repo_info.serve(name="my-first-deployment")
        except RuntimeError as e:
            if e.args == ("this borrower is already holding one of this CapacityLimiter's tokens",):
                print("Ran into that 'CapacityLimiter's tokens' issue. Restarting.")
                continue
            raise e

Error

Traceback (most recent call last):
  File "/local/home/diegolm/workplace/Ticketador/src/Ticketador/src/flow/orca.py", line 40, in <module>
    get_repo_info.serve(name="my-first-deployment")
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 382, in __call__
    return self.result()
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/home/diegolm/.local/python-3.9.6/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/flows.py", line 756, in serve
    await runner.start(webserver=webserver)
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/runner/runner.py", line 365, in start
    tg.start_soon(
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/utilities/services.py", line 53, in critical_service_loop
    await workload()
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/runner/runner.py", line 643, in _get_and_submit_flow_runs
    return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/runner/runner.py", line 818, in _submit_scheduled_flow_runs
    if self._acquire_limit_slot(flow_run.id):
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/prefect/runner/runner.py", line 788, in _acquire_limit_slot
    self._limiter.acquire_on_behalf_of_nowait(flow_run_id)
  File "/local/home/diegolm/.virtualenvs/xingu/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1846, in acquire_on_behalf_of_nowait
    raise RuntimeError(
RuntimeError: this borrower is already holding one of this CapacityLimiter's tokens

Versions

Version:             2.14.3
API version:         0.8.4
Python version:      3.9.6
Git commit:          f1ff9257
Built:               Thu, Nov 2, 2023 4:12 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

No response

serinamarie commented 10 months ago

Hi @diego-lima, thanks for opening your first issue with Prefect. I was able to reproduce this using a simpler example:

from prefect import flow

@flow(retries=5)
def my_flow():
    raise ValueError("This is a test")

if __name__ == "__main__":
    my_flow.serve(name="my-first-deployment")

We have added this to our backlog to work on as soon as we can.

discdiver commented 9 months ago

FWIW, I was able to produce with @diego-lima's code with Prefect 2.14.5 but not with @serinamarie's code. I ran with the process limit set to 5 and 1.

Result from @serinamarie's example:

...
ValueError: This is a test
11:31:29.277 | ERROR   | Flow run 'hilarious-mosquito' - Finished in state Failed('Flow run encountered an exception. ValueError: This is a test')
11:31:29.565 | INFO    | prefect.flow_runs.runner - Process for flow run 'hilarious-mosquito' exited cleanly.