JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
29 stars 11 forks source link

[feature request] Worker scheduler based on current load #76

Open MilesCranmer opened 3 years ago

MilesCranmer commented 3 years ago

I make significant use of Julia's Distributed functionality in my SymbolicRegression.jl package. It uses asynchronous computing for a genetic algorithm: there are ~10 "populations" of individuals which can evolve independently of each other, can be merged asynchronously, and then continue evolving separately again. Thus, I can put each population into a separate @spawnat statement, then fetch and merge them into a bank of individuals when they are complete, and then @spawnat again with a mixed population.

However, the evolution time over each population is quite variable. I find that if I use a number of procs equal to my cores, oftentimes there will be many processors not being used at any given time. e.g., I have 20 populations, and 10 cores. Sometimes, @spawnat :any will be "out of phase" with the busiest workers, and keep allocating jobs to the already-busy workers, while the free workers don't receive any work. [having procs > cores would be an overkill solution that would lead to issues, such as load being larger than # of cores, too much memory usage, longer startup time, etc]

To fix this, I wrote a simple (implementation-specific) scheduler that allocates to the processor with the fewest jobs already in its queue. This boosts performance by nearly double.

So, I am wondering if @spawnat :any could be a bit more clever about which worker it puts a job onto? This is the current worker scheduler: https://github.com/JuliaLang/julia/blob/bb5b98e72a151c41471d8cc14cacb495d647fb7f/stdlib/Distributed/src/macros.jl#L3-L13 As you can see, it goes through the list of workers one-by-one, regardless of whether they are free or busy.

This is what I am using for a scheduler, which gives me a big performance boost:

function next_worker(worker_assignment::Dict{Any, Int}, procs::Vector{Int})::Int

    # Count number of jobs on each process:
    job_counts = Dict(proc=>0 for proc in procs)
    for (key, value) in worker_assignment
        job_counts[value] += 1
    end

    # Return worker with fewest jobs:
    least_busy_worker = reduce(
        (proc1, proc2) -> job_counts[proc1] <= job_counts[proc2] ? proc1 : proc2,
        procs
    )
    return least_busy_worker
end

It simply counts the number of jobs allocated to each worker, and then allocates to the one with the fewest. (independent of the expected length of the jobs, etc.).

Could something like this be implemented more generally in the Distributed module of Julia? e.g., have a counter for each process that is incremented upon spawning, decremented upon job completion.

Thanks! Miles

vchuravy commented 3 years ago

I think features like these are best developed outside the standard library. You might want to look at Dagger.jl which does this kind of scheduling.

MilesCranmer commented 3 years ago

Sounds good to me. Maybe a warning can be added to the docs for @spawn and @spawnat? Or perhaps the docs could be reworded? Reading through the docs, I assumed it would allocate the job to the next available worker, not simply cycle through the workers regardless of their load, (which seems far too trivial for even a stdlib scheduler; even the unix command xargs -P <N> has a more advanced scheduler).

As an example, if it so happens that the # of workers is an integer ratio with the number of jobs to be submitted, and one calls @spawnat :any, and then loops through this list many times, this will result in the longest job "stacking up" on one worker, which in the limit, would result in a substantial slowdown - losing all benefits of parallelism.

Even allocating a job to a random worker would be enough to get around this, but it might hurt performance in a stochastic way for few jobs submitted.

MilesCranmer commented 3 years ago

Related discourse thread: https://discourse.julialang.org/t/how-to-maximize-cpu-utilization-spawn-assigning-to-busy-workers-use-pmap-instead/53648/13. (FYI I can't use pmap since I need to work asynchronously and sequentially)