pytorch / torchx

TorchX is a universal job launcher for PyTorch applications. TorchX is designed to have fast iteration time for training/research and support for E2E production ML pipelines when you're ready.
https://pytorch.org/torchx
Other
308 stars 100 forks source link

[RFC][torchx/schedulers - aws batch] support wildcard job queue selection #634

Open kiukchung opened 1 year ago

kiukchung commented 1 year ago

Description

For the AWS Batch scheduler integration in torchx, support wildcards in the job_queue runopt with a simple (or configurable or extendable) queue selection logic. For instance, assume that my organization uses a job queue naming convention of the form

${TEAM_NAME}-${CLUSTER_TYPE}-${REGION}

Example:

pt_r2p-gpu_cluster-us-east-1a
pt_r2p-gpu_cluster-us-east-1b
pt_r2p-gpu_cluster-us-east-1c
pt_r2p-cpu_cluster-us-east-1a
pt_r2p-cpu_cluster-us-east-1b
pt_r2p-trainium_cluster-us-east-1a
pt_r2p-trainium_cluster-us-east-1c

pt_core-gpu_cluster-us-east-1a
pt_core-cpu_cluster-us-east-1a

If I'm in the pt_r2p team, and want to submit a job to any gpu compute environment that has free capacity regardless of region, then I can use a wildcard on the ${CLUSTER_TYPE} portion of the job queue name as:

[aws_batch]
job_queue=pt_r2p-gpu_cluster-*

Motivation/Background

Ideally, with AWS Batch we create a single job queue (JQ) connected to multiple compute environments (CE) and always submit to the same JQ to have Batch figure out which CE the job needs to be submitted to. With Fair Share (FS) scheduling announced a year ago (see announcement) this is theoretically possible. However many users of AWS Batch are still using FIFO (originally supported) scheduling policy in which case having a single JQ is impractical in a multi-team use case scenario since users from other teams may affect the scheduling overhead of my team. This starts escalating pretty quickly in cases where teams BYO (bring your own) capacity.

Detailed Proposal

Support wild-cards for job_queue names for the aws_batch scheduler with the following MVP queue selection algorithm (greedy):

  1. Find all job queues that match the wild-card expression
  2. For each job queue pull the details of the CE that it is hooked up to
  3. Filter the CEs down to the ones that actually support the host type + quantity that the job needs
  4. For each filtered CE look at the free resources and rank them by most-free -> least-free
  5. Pick the job queue that has the most CEs with the highest rank

This algorithm effectively choses the JQ that the job needs to be submitted to that will yield the least wait time in the queue.

To actually implement the greedy algorithm above, I suggest that we add chain-able selection algorithms. For instance the algorithm above can be expressed as a chain of primitives:

jqs = get_matching("pt_r2p-gpu_cluster-*")
jqs = filter_resource(jqs, role.resource)
jqs = order_by(jqs, Ordering.FREE_CAPACITY, desc=True)

jqs[0] # <-- select this one to run

Similarly a "first-match" algorithm can be implemented as:

get_matching("pt_r2p-gpu_cluster-*")[0]

We can follow torchdata's datapipe interface such that each function in the chain has the signature:

def fn(jqs: List[JobQueue], *args, **kwargs) -> List[JobQueue]:
   """
   Returns a sorted/filtered/manipulated list of job queues to pass to the next chained fn.
   """
   pass

Alternatives

  1. Resources/guidelines/script to migrate from FIFO queues Fair-Share.

Additional context/links

N/A

d4l3k commented 1 year ago

Seems like a reasonable feature to me. Is there also interest in global queue/multiregion support? That would be interesting to have as well though likely much more work to implement for the non-schedule methods.

We should skip any ranking logic if there's no glob in the cluster

Picking best queue is tricky -- "most free" is a bit hard to define since different queues could have different types of workloads but "sum(hosts for hosts in job_queue)/sum(ce max hosts)`. Handling autoscaling CEs is also a bit tricky -- need to get the max size but max size isn't necessary equivalent to allocatable hosts