twosigma / Cook

Fair job scheduler on Kubernetes and Mesos for batch workloads and Spark
Apache License 2.0
338 stars 63 forks source link

Ideas to speed ranking #342

Open wyegelwel opened 7 years ago

wyegelwel commented 7 years ago

I figure it is worth collecting ideas that can be used to speed ranking. The assumption is that ranking is largely inhibited by queries to datomic. Most solutions will involve re-using computation or avoiding datomic queries.

Currently on each ranking cycle, we throw out all values computed previously, query datomic for the most recent set of waiting and running jobs and compute the global ordering anew.

Rough psuedo code would be:

waiting_jobs = get_waiting_jobs(db)
running_jobs = get_running_jobs(db)
active_jobs = waiting_jobs + running_jobs
user_to_jobs = active_jobs.group_by('user')
user_to_sorted_jobs = map_vals(sort_user_queue, user_to_jobs)
Ranking 
user_to_sorted_jobs_with_dru = map_vals(annotate_dru, user_to_sorted_jobs)
global_order = sorted_merge('dru', user_to_sorted_jobs_with_dru)

Maintain sorted user queues and update incrementally

This requires keeping the user_to_sorted_jobs map above and only querying for the waiting and running jobs that states have changed since the last time we queried for it. Given the "new waiting" and "new running", we need to add those jobs to the user_to_sorted_jobs maps and then continue the algorithm from there.

In psuedo code it may look like:

waiting_jobs = get_waiting_jobs_since(db, last_query_time)
running_jobs = get_running_jobs_since(db, last_query_time)
for job in waiting_jobs + running_jobs:
   user_to_sorted_jobs.get(job['user']).sorted_add(job)
user_to_sorted_jobs_with_dru = map_vals(annotate_dru, user_to_sorted_jobs)
global_order = sorted_merge('dru', user_to_sorted_jobs_with_dru)

We expect get_waiting_jobs_since and get_running_jobs_since to be faster than getting all waiting and running jobs and we expect adding a relatively small number of jobs (the new jobs since) to the already sorted user queues should be faster than recomputing from scratch.

Maintain a cache of waiting and running jobs

Since we believe that the most time is spent loading data from datomic (getting the initial running and waiting lists and then getting fields from them), maintaining a cache of jobs that are running and waiting as clojure maps instead of datomic entities should improve ranking.

In psuedocode this may look like:

waiting_jobs = get_waiting_jobs(cache)
running_jobs = get_running_jobs(cache)
active_jobs = waiting_jobs + running_jobs
user_to_jobs = active_jobs.group_by('user')
user_to_sorted_jobs = map_vals(sort_user_queue, user_to_jobs)
Ranking 
user_to_sorted_jobs_with_dru = map_vals(annotate_dru, user_to_sorted_jobs)
global_order = sorted_merge('dru', user_to_sorted_jobs_with_dru)

Note, the rest of the code stays the same, the only difference is we are hitting a cache of jobs which should be fast and since they are now clojure maps the processing we do should be fast. For example, one of the most time consuming parts of ranking now is sorting the user queues. However, sorting a list of 100,000 numbers takes about 75 ms, as opposed to the multiple seconds we observe using datomic entities.

Please add more ideas or comments or points to the ones above.

wyegelwel commented 7 years ago

For maintaining a cache of running and waiting jobs this could be implemented by subscribing to the datomic transaction report queue and updating the cache any time a job changes state. On update, the code could use datomic pull to get the latest data on the job as a Clojure map. To avoid missing jobs, the cache will need to periodically query the db for all running and waiting jobs to refresh the cache, following a lambda architecture. Doing these refreshes on a minute time scale is likely fine.

Maintaining a cache of current entities that satisfy a proper is likely genetic enough that it can we written as a library that cook depends on.

wyegelwel commented 7 years ago

Although we currently track the time to rank, the real metric of interest is time until a job is considerable by the matcher. Therefore, measuring submit to rank time is something we will want to monitor as a single rank cycle is no longer a guarantee a job will become considerable.