PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.96k stars 1.57k forks source link

EcsTaskRunner #14345

Open yaronlevi opened 3 months ago

yaronlevi commented 3 months ago

First check

Prefect Version

3.x

Describe the current behavior

Creating this idea after a chat I had with Will Raphaelson.

So currently we have:

What about an EcsTaskRunner()? Ray and Dask TaskRunners allows me to .submit() a task to some “compute pool”, there the task will be attached to some one-off instance with dedicated memory and cpu. Isn’t this the exact same story when looking at AWS’s ECS? Ray and Dask are quite “exotic” technologies and often come up when you start talking about highly parallel machine learning/AI workloads. But what about a more common use case: A weekly job that needs to process many files in parallel where each file is significantly different in size:

@flow(task_runner=EcsTaskRunner())
def my_weekly_huge_files_job():
    files_on_s3 = get_list_of_huge_files()
    for path,file_size in files_on_s3:
        if file_size > xxx:
            process_file.submit(path, memory_size='4gb')
        else
            process_file.submit(path, memory_size='1gb')

The code above would be super robust since a certain huge file might crash, but it won’t affect the other instances. I believe such EcsTaskRunner() would be adopted by the community very quickly as many already use ECS for their ECS push work pools. This opens up many possibilities for distributed compute on remote, separated, machines, without intruding big guns like Ray or Dask.

As a side note, maybe Dask and Ray are already much more accessible these days? I could swipe a credit card and use Coiled.io (managed Dask cluster) to get a very similar experience to what I’ve described above in the code. But still ECS would be much more common and affordable.

image

Describe the proposed behavior

Add support for EcsTaskRunner() as many already use ECS, instead of using "big guns" like Dask and Ray.

Example Use

No response

Additional context

No response

WillRaphaelson commented 3 months ago

thanks @yaronlevi - the need is clear to me - to be able to execute prefect tasks as ecs tasks with an elegant UX. We'll be taking up design for this problem in the next weeks to months and I'll keep the community apprised on this and solicit feedback via this issue.

rmnvncnt commented 1 month ago

I think this is a very interesting idea. For now, I deal with similar use cases using deployments calling deployments and it can become quite complicated at times. With this method, it can be difficult to know what started a given flow run.

My biggest issue so far with the way Prefect handles ECS tasks is the development cycle. I usually run my flows and tasks locally up until I'm satisfied with the result. Then, I try to deploy my flows to remote and things start to get complicated :

  1. I build and push an image of my code to ECR. It can take a moment, but there's no way around.
  2. I run my deployment.
  3. I wait for my capacity provider to pop a box and start to work.
  4. The box dies because of, say, an OOM error.
  5. I attempt to fix the issue.
  6. Rinse and repeat.

I'm not sure my process is correct, I guess there might be space for improvement, but @yaronlevi's solution allows me to test the code on the infrastructure directly, without the whole deployment step. While not mocking my entire remote infrastructure, it seems to make the cycle shorter.