JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
631 stars 67 forks source link

Scheduler seems too eager on pushing jobs to workers #286

Open DrChainsaw opened 2 years ago

DrChainsaw commented 2 years ago

Main problem is that adding more workers to the pool is ineffective since there are no unassigned tasks.

I suppose this might be done to reduce scheduling latency. If it is, it would be useful to be able to control the amount of overprovisioning for cases when one intend to add more workers (e.g. as jobs are started on a cluster).

julia> using Distributed, Dagger

julia> addprocs(5, exeflags="--project=test")
5-element Vector{Int64}:
 2
 3
 4
 5
 6

julia> @everywhere begin using Dagger, Distributed
                   function expensivejob(i)
                       @info "Started task $i on  $(myid())"
                       while true end
                   end
               end
julia> ts = delayed(vcat)([delayed(expensivejob)(i) for i in 1:200]...);

julia> collect(ts)
[ Info: Started task 2 on  3
[ Info: Started task 7 on  3
[ Info: Started task 12 on  3
[ Info: Started task 17 on  3
[ Info: Started task 22 on  3
[ Info: Started task 27 on  3
...
[ Info: Started task 180 on  6
[ Info: Started task 185 on  6
[ Info: Started task 190 on  6
[ Info: Started task 195 on  6
[ Info: Started task 200 on  6
jpsamaroo commented 2 years ago

Currently we make no guarantees about when and where work will be scheduled (unless constrained by the user via scopes, single, procutil, and similar mechanisms), only that input-output ordering is maintained.

However, what's happening here is that we aren't being careful to ensure that tasks with new signatures (Tuple{typeof(expensivejob), Int} in this case) are executed exclusively the first time we see them, which we used to estimate function execution time. I don't plan to go back to that system as-is, but I think we can fix this via distributed work stealing, which I have plans to implement anyway.

DrChainsaw commented 2 years ago

Currently we make no guarantees about when and where work will be scheduled (unless constrained by the user via scopes, single, procutil, and similar mechanisms), only that input-output ordering is maintained.

Ok, so is the current strategy to just start a very large number of tasks on each processor then and let the task scheduler handle it? I tried the above example with 20000 jobs and it seems to stop starting jobs after around 2000. That was a surprisingly large number for me. What is the advantage over just keeping them in a Thunk queue? Concurrency in case they yield?

Just to avoid misunderstandings, expensivejob never terminates and therefore I would expect the scheduler to just stop assigning tasks after each worker got one task. I suppose that might happen by accident with the time estimation thing you mentioned.

Would distributed work stealing rebalance out the tasks among processors in case more processors are added or some group of tasks finishes faster?

jpsamaroo commented 2 years ago

Ok, so is the current strategy to just start a very large number of tasks on each processor then and let the task scheduler handle it?

Basically, yeah. Until we have a good way to determine if a task is going to be compute-heavy or yield-heavy, we're going to just schedule as much work as we can (still taking network transfers and estimated worker load into account). An upcoming PR will temper this by stopping scheduling when more concurrent tasks would cause an OOM situation, but we don't currently have any good way to know if slowing down task scheduling would be beneficial.

Just to avoid misunderstandings, expensivejob never terminates and therefore I would expect the scheduler to just stop assigning tasks after each worker got one task. I suppose that might happen by accident with the time estimation thing you mentioned.

I think it would be reasonable to allow one "unknown" task per worker/processor.

Would distributed work stealing rebalance out the tasks among processors in case more processors are added or some group of tasks finishes faster?

Yeah, we would allow any processor to steal work from neighboring processors, or from another worker's processors. So newly-added workers would be able to start stealing immediately.

DrChainsaw commented 2 years ago

Yeah, we would allow any processor to steal work from neighboring processors, or from another worker's processors. So newly-added workers would be able to start stealing immediately.

Sounds great!

I wouldn't mind having an options knob to control the amount of worker overload, something like options.allow_overwork(worker, thunk) but I leave it up to you if you just want to just close this.

jpsamaroo commented 2 years ago

Can I ask why you want this, other than to ensure that new workers get utilized effectively? I'm thinking that I might not want to expose too many performance-focused knobs for Sch, because it can make maintenance and testing more difficult. It might also encourage users to tweak the setting for optimal performance on their problem, when really it would be better to file an issue so we can make the scheduler better understand their needs.

DrChainsaw commented 2 years ago

Yeah, thats perfectly fair.

I was mainly thinking that it is probably very difficult to know from the first run what type of job it is and that users can give a hint. Probably more of a thinking out loud kinda thing rather than a suggestion.

DrChainsaw commented 2 years ago

I can probably spend some time on a PR for work stealing.

However, I'm unsure on where it falls between "requires careful restructuring of the scheduler" and "just be a careful with locks and performance". I suppose if it leans more towards the former a PR might do more harm than good (e.g. CR driven development).

jpsamaroo commented 2 years ago

Worker-local work stealing can definitely be done in a straightforward manner by just using some locks and channels; the only complexity is satisfying the task's scope when stealing work (maybe if the scope is anything more specific than ProcessScope we don't allow it to participate in work stealing for right now?).

Cross-worker stealing is more complex, because it's very expensive to steal just a single task over the network. But regardless of the performance implications, we could consider implementing a per-worker channel as a backlog of tasks which we estimate will take more than 1 second before they're executed by the local worker (and we have this information available in do_task, where this should happen).

Implementing the former would be a huge win alone, so if you want to start with that, I think that would be excellent. We can figure out the latter once worker-local stealing is functional.

DrChainsaw commented 2 years ago

Alright, I started to look into the scheduler code but I realized that I'm not sure what type of work stealing we are talking about here.

The current scheduler seems to fire off all available thunks at once. Once they are scheduled as Tasks by Julias native task scheduler, is there really any way to steal them? At least it seems like that would require poking into the internals of the scheduler. Is that what I should be looking into?

If we instead look at stealing Thunks, is there really anything to steal? My understanding is that the queue of Thunks to schedule is just ComputeState.ready which is one global queue for all workers. The Thunks are assigned to workers at the same instance that they are started as Tasks, right? Is that something I should be looking into changing?

[Edit]: A third option is when the scheduler realizes a processor is a max capacity. Here I guess we could make the process available for work stealing.

jpsamaroo commented 2 years ago

There are actually two scheduling stages available in my mind: the first is in schedule!, which assigns tasks to their initial processors; however, a scheduled task is not actually executing until do_task is called with the constructed comm tuple (it's a bundle of data describing everything necessary to execute the task; holding this object means you "own" the ability to execute the task).

This comm tuple is key to creating a new scheduling stage: in do_tasks (https://github.com/JuliaParallel/Dagger.jl/blob/5f315d1eb3936494b6241cc72ee744351cd3907c/src/sch/Sch.jl#L928-L950), instead of immediately calling do_task for each comm tuple, we can instead add a layer of queuing of comm tuples based on channels that will allow us to start a background task to pull work out of the channels and process it. (Currently we just launch all tasks at once (via https://github.com/JuliaParallel/Dagger.jl/blob/5f315d1eb3936494b6241cc72ee744351cd3907c/src/sch/Sch.jl#L940-L942), but we can change behavior to take only a few tasks at a time based on some heuristic.) These channels can then be stolen from to provide work stealing.

This implies that the initial processor assigned by schedule! is really just a hint; if the scheduler does a good job at assigning tasks to processors evenly, then work stealing would not happen and processors will execute the work that they were originally assigned. But if the scheduler did a bad job and oversubscribes a processor, work stealing (in do_tasks) allows workers to even things out on their own.

DrChainsaw commented 2 years ago

but we can change behavior to take only a few tasks at a time based on some heuristic.

Could this be the same kind of heuristic as in my option three above? The mechanism does not seem to work but it seems like it should be fixable. Or is it a known dead end?

One last thing which is not fully clear to me: If we are to delay the start of Thunks, what are the main advantages of doing so in do_tasks rather than in schedule!?

Is it that in many/most cases it will start right away and now we have transferred everything needed to the right worker? If we delay in schedule! the remotecall_wait (and possibly other validation-y code) in fire_tasks! may cause significant latency.

Is another objective to avoid making the 'main' scheduler a bottleneck and distribute the thunk delaying heuristic and work stealing mechanism to the workers? With work stealing I suppose one should utilize the fact that there is an 'intended worker' for a given Thunk so that work stealing in the typical case is an exception.

It seems rather tempting to delay thunks by just rescheduling them and let the existing machinery do its thing, but depending on the above it might not be the right thing to do.

DrChainsaw commented 1 year ago

I tried to get workstealing from Dagger 0.17 to do its thing on the mwe by adding new workers, but it doesn't seem to happen:

julia> addprocs(5, exeflags="--project")

julia> @everywhere begin using Dagger, Distributed
                          function expensivejob(i)
                              @info "Started task $i on  $(myid())"
                              while true end
                          end
                      end

julia> ctx = Dagger.Context(workers());

julia> ts = delayed(vcat)([delayed(expensivejob)(i) for i in 1:200]...);

julia> res = @async collect(ctx, ts);

      From worker 8:     [ Info: Started task 1 on  8
      From worker 7:    [ Info: Started task 2 on  7
      From worker 9:    [ Info: Started task 3 on  9
      From worker 11:   [ Info: Started task 5 on  11
      From worker 10:    [ Info: Started task 4 on  10

julia> newps = addprocs(5, exeflags="--project");

julia> @everywhere newps begin using Dagger, Distributed
                          function expensivejob(i)
                              @info "Started task $i on  $(myid())"
                              while true end
                          end
                      end

julia> addprocs!(ctx, newps)
1

Is there something extra needed for it to kick in?

jpsamaroo commented 1 year ago

Yeah it's still not really possible to do this for already-submitted jobs because, even though not all tasks will run simultaneously, the scheduler still assigns them to a worker immediately. Implementing cross-worker work stealing would fix this, but has questionable performance implications (cross-thread workstealing is so cheap and profitable that it's fine to do it all the time).