PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.85k stars 1.55k forks source link

Add the possibility for an internal rate limiter for `task.map(...)` #12252

Open obendidi opened 6 months ago

obendidi commented 6 months ago

First check

Prefect Version

2.x

Describe the current behavior

Currently when running a task.map(...) it triggers all task_runs concurrently which could prove to be too much load to handle for the prefect server when mapping over a lot of items.

A workaround I've been using is to convert this:

@flow
def my_flow():
  my_list = list(range(1000))
  my_result_futures = my_task.map(my_list)

to this:

@flow
def my_flow():
  my_list = list(range(1000))
  my_result_futures = []
  for x in my_list:
    rate_limiter("my-task-runs-limiter") # configured globally
    f = my_task.submit(x)
    my_result_futures.append(f)

Describe the proposed behavior

What I propose is something like this:

@flow
def my_flow():
  my_list = list(range(1000))
  my_result_futures = my_task.map(my_list, rate_limiter="my-task-runs-limiter", rate_limiter_occupy=1)

Example Use

No response

Additional context

No response

taljaards commented 6 months ago

Have you taken a look at concurrency limits / task run concurrency limits (tags)?

obendidi commented 6 months ago

Have you taken a look at concurrency limits / task run concurrency limits (tags)?

Hmm, the use case I'm talking about here i the same as: https://docs.prefect.io/latest/guides/global-concurrency-limits/#throttling-task-submission using .map() to create a lot of tasks can create a bottleneck on the prefect server, so instead of doing a loop and adding a rate_limiter before each .submit(), I was thinking the same solution could be applied internally to the .map() method

obendidi commented 6 months ago

Made a wrapper around task.map with a rate limiter, happy to have your feedback whether it's the best approach to solve the problem stated above:

import typing as tp

from prefect import Task
from prefect.concurrency.sync import rate_limit
from prefect.futures import PrefectFuture

T = tp.TypeVar("T")

def map_task_with_rate_limiter(
    task: Task[tp.Any, T],
    *,
    map_on: tp.Dict[str, tp.Sequence[tp.Any]],
    rate_limiter_name: str = "task-runs-limiter",
    **task_kwargs: tp.Any,
) -> tp.List[PrefectFuture[T, tp.Any]]:
    """Wrapper around prefect's 'task.map()' to add a rate limiter around submitting
    tasks to prefect's backend.

    It uses a pre-defined task limiter that is defined globally.
    """
    map_on_lengths = {len(v) for v in map_on.values()}
    if len(map_on_lengths) != 1:
        raise ValueError("All 'map_on' values must have the same length")

    map_on_size = map_on_lengths.pop()

    futures = []
    for i in range(map_on_size):
        mapped_kwargs = {k: v[i] for k, v in map_on.items()}
        rate_limit(rate_limiter_name, occupy=1)
        futures.append(task.submit(**task_kwargs, **mapped_kwargs))
    return futures
desertaxle commented 3 months ago

Hey @obendidi! In our 3.0 release candidate, we added a ThreadPoolTaskRunner to allow local control of concurrent runs via a max_workers parameter. That parameter hasn't made it into a release yet, but I think it will address your use case once it's released!