usegalaxy-eu / tpv-metascheduler-api

Metascheduler for TPV as Service
MIT License
0 stars 5 forks source link

Integrate simplified version of fuzzy-based matchmaking algorithm #12

Open pauldg opened 3 months ago

pauldg commented 3 months ago

Need @abdulrahmanazab to help written/graphical version of the algorithm using the stats we have which we can then integrate in the code base.

abdulrahmanazab commented 3 months ago

Universal Assumption: all destinations in question are getting jobs only from Galaxy

First, we have a list of destinations that actually "can" take the job, i.e. for each of these destinations: Match(j.requirements,d.specifications) > 0.

what we need is an ordered list of these destinations based on the degree of matching, i.e. which d is the best for j

Second, we have a simple logic that is based on: queue size (how many jobs are currently queued to d) and the distance (latitude and longitude based distance calculation between the destination and the job input dataset location)

Now we want to improve this logic as we have new information available, about destination (d) and jobs of a specific tool (t):

Based on all the above, the new logic will be composed

abdulrahmanazab commented 2 months ago

Scheduling algorithm logic: Given:

Algorithm logic:

sanjaysrikakulam commented 3 days ago

@pauldg and I were discussing:

  1. In the above algorithm, we are not taking into account the input dataset size (also, the median input data set size)
  2. Handle cases where there is no median queue/run time because the destination could be new or the tool was never used/run and how do we weigh/rank in such cases.
sanjaysrikakulam commented 3 days ago

A temp implementation of the algorithm:

Input:

  1. List of candidate destinations
  2. For each destination: (list(dicts))
    1. Total number of CPU cores and Memory available
    2. Median waiting time of the current tool in the queue
    3. Median running time of the current tool
    4. Current queue size of the destination
    5. Current number of jobs running on the destination
    6. Current number of unclaimed cores on the destination
    7. Current number of unclaimed memory on the destination
    8. Distance between the input data location and the destination
  3. Job requirements (dict)
    1. Number of CPU cores required
    2. Amount of memory required
def calculate_matching_score(destination: dict) -> float:
    """
    Calculate the matching score between a job and a destination.
    """
    median_waiting_time = destination.get('median_waiting_time', None)
    queue_size = destination.get('queue_size', 1)
    median_running_time = destination.get('median_running_time', None)
    running_jobs = destination.get('running_jobs', 1)

    # Queue matching factor (qm).
    if median_waiting_time > 0 and queue_size > 0:
        qm = 1 / (median_waiting_time * queue_size)
    else:
        qm = float('inf')

    # Compute matching factor (cm).
    if median_running_time > 0 and running_jobs > 0:
        cm = 1 / (median_running_time * running_jobs)
    else:
        cm = float('inf')

    # Final matching score
    return qm + cm

def select_best_destination(job_requirements: dict, destinations: list) -> list:
    """
    Selects the best destination for a job based on job requirements and destination metrics.
    """
    cpu_required = job_requirements['cpu_cores']
    memory_required = job_requirements['memory']

    # Filter out destinations that can't meet basic requirements based on the "real-time" data
    viable_destinations = []
    for dest in destinations:
        if dest['unclaimed_cores'] > cpu_required and dest['unclaimed_memory'] > memory_required:
            viable_destinations.append(dest)

    # Sort by distance to input data location (ascending)
    viable_destinations.sort(key=lambda x: x.get('distance_to_data', float('inf')))

    # Calculate matching scores for each viable destination
    for dest in viable_destinations:
        dest['matching_score'] = calculate_matching_score(dest)

    # Sort by matching score (descending)
    viable_destinations.sort(key=lambda x: x['matching_score'], reverse=True)

    return viable_destinations