JuliaLang / julia

The Julia Programming Language
https://julialang.org/
MIT License
45.73k stars 5.48k forks source link

Analyzing threaded reduce. Is this behavior expected? #34470

Open tkf opened 4 years ago

tkf commented 4 years ago

I tried to analyze this super simple threaded map-reduce

function tmapreduce(f, op, xs; init)
    if length(xs) <= 2
        return mapreduce(f, op, xs; init = init)
    end
    mid = length(xs) ÷ 2
    left = @view xs[1:mid]
    right = @view xs[mid+1:end]
    task = Threads.@spawn tmapreduce(f, op, right; init = init)
    return op(tmapreduce(f, op, left; init = init), fetch(task))
end

(see below for the full code)

When I run this function with expensive f and four threads (my laptop has 4 physical cores), I see this profile

demo

The vertical bars represent the interval for executing f. X axis is the array index. Colors are Threads.threadid().

I see that all of the bars occur as consecutive two bars like this

   |
  |

with the same color (= thread). Is this due to that the task is popped off from the head of local task deque or something? However, other than this property, I don't see any structure in the way the tasks are scheduled. (It's just an artifact: https://github.com/JuliaLang/julia/issues/34470#issuecomment-576904686)

Is this an expected behavior? From the description of "depth-first scheduling", I was (maybe too naively) expecting something like this

ideal

This behavior would be very useful to implement parallel findfirst and alike if this is possible. I think this also helps cache locality for reading the data (though it could be bad for writing due to false sharing). I think I can try to encourage (or even enforce) scheduling like this by using Event to order the scheduling. But I wondered if some kind of improvements for this respect are planned for Julia's scheduler.

I'm using Julia 1.5.0-DEV.67. Julia 1.3.1 produces similar pictures.

Full code

function tmapreduce(f, op, xs; init)
    if length(xs) <= 2
        return mapreduce(f, op, xs; init = init)
    end
    mid = length(xs) ÷ 2
    left = @view xs[1:mid]
    right = @view xs[mid+1:end]
    task = Threads.@spawn tmapreduce(f, op, right; init = init)
    return op(tmapreduce(f, op, left; init = init), fetch(task))
end

function tmapreduce_profile(f, op, xs; init)
    starts = resize!(typeof(time_ns())[], length(xs))
    stops = resize!(typeof(time_ns())[], length(xs))
    threadids = zeros(Int, length(xs))
    value = tmapreduce(op, 1:length(xs); init = init) do i
        threadids[i] = Threads.threadid()
        starts[i] = time_ns()
        y = f(xs[i])
        stops[i] = time_ns()
        return y
    end
    return (value = value, starts = starts, stops = stops, threadids = threadids)
end

demo(; workload = 10_000, length = 64) =
    tmapreduce_profile(+, 1:length; init = 0) do x
        a = 1.0
        for _ in 1:workload
            a = sin(a)
        end
        x
    end

demo()
result = demo()

using Plots

function plotresult(result)
    t0 = minimum(result.starts)
    starts = (result.starts .- t0) ./ 1e6
    stops = (result.stops .- t0) ./ 1e6
    threadids = result.threadids
    return plot(
        [1:length(starts) 1:length(starts)]',
        [starts stops]',
        label = "",
        color = [threadids threadids]',
        ylabel = "Time [ms]",
        xlabel = "Array Index",
    )
end

plt = plotresult(result)
tro3 commented 4 years ago

@tkf - cool plot. If you don't mind, I'll use a similar concept for the ThreadPools logger in addition to the textual activity log I have now. (If you do mind, no worries.)

IIUC, @spawn schedules the task on a random thread and puts it into the queue (at the C-level) of that thread before execution. So if 100 @spawns are called, all 100 tasks are scheduled across threads before they have started. They might end up evenly spread (likely), but they might end up on one single thread, in theory. (Update - the assignment is based on the modulus of the current time, so while the starting thread is random, they should evenly divide as time progresses. This might explain why the jobs are assigned to threads in pairs in your example.)

I'd love for the C-level implementation of @spawn to have the queued behavior we've been discussing. Then you'd have the pre-divided @threads option and the queued @spawn option to choose from. Add an option to prevent primary thread operation on either, and we have it all.

As for the pairing, I have no clue. I see a similar pairing on my machine, but I have a far worse grouping for some reason:

Figure_1

tro3 commented 4 years ago

If it helps, I rewired your demo to use the activity log, and here are the results from a couple of runs (times are in ns, columns are threads 1:4 from left to right, number is active index, and * means more than one job active)

julia> include("script.jl")
      0.000    -    -    -    -
 100000.000    1    -    -    -
 200000.000    1   33   17    -
 300000.000    2   34   17    -
 400000.000    -   34   17    -
 500000.000   35    9   18    -
 600000.000   36    9   18    -
 700000.000   36   10   49    -
 800000.000    -   51   49    -
 900000.000    5   51   50    -
1000000.000    6   52   50    -
1100000.000    6   52   50    -
1200000.000   41   25    3    -
1300000.000   41   25    3    -
1400000.000   42   26    -    -
1500000.000    -   45    4    -
1600000.000   47   45    4    -
1700000.000   47   46   19   29
1800000.000   48   46   19   30
1900000.000    -   57   20   30
2000000.000   13   58   21   59
2100000.000   13   58   21   59
2200000.000   14   37   22   60
2300000.000    -   37   22   31
2400000.000   39   38   15   31
2500000.000   40    -   16   32
2600000.000   40   23   16   32
2700000.000    -   24   43   61
2800000.000   27   24   43   62
2900000.000   28   63   44   62
3000000.000   28   63   53    7
3100000.000   11   64   53    7
3200000.000   11   55   54    8
3300000.000   12   55   54    -
3400000.000   12   56    -    -
3500000.000    -   56    -    -
3600000.000    -    -    -    -
3700000.000    -    -    -    -

julia> include("script.jl")
      0.000    -    -    -    -
 100000.000    -   33   49   41
 200000.000    -   33   49   41
 300000.000    -   33  *53   42
 400000.000    -   34  *53   42
 500000.000    -   34  *39   61
 600000.000    -   59  *40   62
 700000.000    -   59  *40   62
 800000.000    -   60  *54   45
 900000.000    -   60  *54   45
1000000.000    -   60  *57   46
1100000.000    -   35  *58   46
1200000.000    -   35  *58   63
1300000.000    -   35  *50   64
1400000.000    -   36  *50   64
1500000.000    -   36  *37   55
1600000.000    1   47  *38   55
1700000.000    2   47  *38   56
1800000.000    2   48   51   17
1900000.000    -    9   51   17
2000000.000   43    9   52   18
2100000.000   44   10    -   18
2200000.000   44   10   25   27
2300000.000    -    5   26    -
2400000.000    -    6   26   28
2500000.000   13    6   21    3
2600000.000   14   23   21    3
2700000.000   14   23   22    4
2800000.000    -   24    7    4
2900000.000   29   31    7   15
3000000.000   30   31    8   16
3100000.000   30   32    8   16
3200000.000    -   32   11   19
3300000.000    -    -   12   19
3400000.000    -    -   12   20
3500000.000    -    -    -    -
3600000.000    -    -    -    -
3700000.000    -    -    -    -
tkf commented 4 years ago

@tro3 Please go ahead and use the concept or the code snippet or whatever. Having more visualizations in ThreadPools would be great.

Then you'd have the pre-divided @threads option

Just be clear, I don't want to do scheduling manually and/or statically. I just want to know the characteristics of the current/future scheduler and if there is a way to "nudge" the scheduler so that my program works nicely with it. For example, I don't want static scheduling as I'd like to have threaded reduce that can be nested.

(But I agree that it'd be great have customizable scheduler interface as discussed in RFC: Make an abstraction for custom task parallel schedulers (GSoC proposal) - Internals & Design - JuliaLang)

If it helps, I rewired your demo to use the activity log,

Interesting. Do you mind sharing the script? Also, does it do I/O while logging? I specifically try to not yield to the scheduler so that it does not affect the task scheduling too much.

tkf commented 4 years ago

So it looks like I can encourage the scheduler to process earlier elements first by scheduling "bottom first":

function tmapreduce(f, op, xs; init, _task = nothing)
    if length(xs) <= 2
        _task === nothing || schedule(_task)
        return mapreduce(f, op, xs; init = init)
    end
    mid = length(xs) ÷ 2
    left = @view xs[1:mid]
    right = @view xs[mid+1:end]
    task = @task begin
        _task === nothing || schedule(_task)
        tmapreduce(f, op, right; init = init)
    end
    task.sticky = false
    return op(tmapreduce(f, op, left; init = init, _task = task), fetch(task))
end

You can see that the bars are aligned somewhat diagonally and also there is not much loss of parallelism (though I'm just eyeballing):

demo

tro3 commented 4 years ago

Just be clear, I don't want to do scheduling manually and/or statically.

Totally get it. You want (I think) a queueing scheduler, but at the C-level. This is how I wish @spawn worked, rather than pre-assigning.

Here is the hacked code using the ThreadPools logger.


import ThreadPools: showactivity, LogItem

function tmapreduce(f, op, xs; init)
    if length(xs) <= 2
        return mapreduce(f, op, xs; init = init)
    end
    mid = length(xs) ÷ 2
    left = @view xs[1:mid]
    right = @view xs[mid+1:end]
    task = Threads.@spawn tmapreduce(f, op, right; init = init)
    return op(tmapreduce(f, op, left; init = init), fetch(task))
end

function tmapreduce_profile(f, op, xs; init)
    starts = resize!(typeof(time_ns())[], length(xs))
    stops = resize!(typeof(time_ns())[], length(xs))
    threadids = zeros(Int, length(xs))

    io = open("tkflog.txt","w")
    logger = Channel{LogItem}(16*1024) do c
        for item in c
            job, tid, st, t = item
            write(io, "$job $tid $st $t\n")
        end
    end

    t0 = time_ns()
    value = tmapreduce(op, 1:length(xs); init = init) do i
        threadids[i] = Threads.threadid()
        put!(logger, (i, threadids[i], 'S', time_ns()-t0))
        y = f(xs[i])
        put!(logger, (i, threadids[i], 'P', time_ns()-t0))
        return y
    end

    close(io)
    return (value = value, starts = starts, stops = stops, threadids = threadids)
end

demo(; workload = 10_000, length = 64) =
    tmapreduce_profile(+, 1:length; init = 0) do x
        a = 1.0
        for _ in 1:workload
            a = sin(a)
        end
        x
    end

demo()
result = demo()

showactivity("tkflog.txt", 100000)
tkf commented 4 years ago

I see that all of the bars occur as consecutive two bars like this

   |
  |

with the same color (= thread).

Actually this is nothing to do with the scheduler and it's due to length(xs) <= 2; i.e., the basecase processes two elements.

tkf commented 4 years ago

This makes scheduling perfectly "diagonal"

function tmapreduce(f, op, xs; init, _task = nothing)
    if length(xs) <= 2
        _task === nothing || schedule(_task)
        return mapreduce(f, op, xs; init = init)
    end
    mid = length(xs) ÷ 2
    left = @view xs[1:mid]
    right = @view xs[mid+1:end]
    task = @task begin
        if _task === nothing
            waiter = nothing
        else
            waiter = @task schedule(_task)
            waiter.sticky = false
        end
        tmapreduce(f, op, right; init = init, _task = waiter)
    end
    task.sticky = false
    return op(tmapreduce(f, op, left; init = init, _task = task), fetch(task))
end

demo

It seems that the scheduler still can do load balancing with uneven workload:

demo

The last figure is generated with

@@ -36,7 +36,7 @@ end
 demo(; workload = 10_000, length = 64) =
     tmapreduce_profile(+, 1:length; init = 0) do x
         a = 1.0
-        for _ in 1:workload
+        for _ in 1:rand(1:workload)
             a = sin(a)
         end
         x
tkf commented 4 years ago

@tro3 Thanks for sharing the script. You have code like put!(logger, (i, threadids[i], 'S', time_ns()-t0)). Did you notice if it changes the scheduling? IIUC put! can yield to the scheduler which may decide to start new task.

tro3 commented 4 years ago

@tkf - very cool results! Yes, I'm sure it can mess with the scheduling, but I am not using @spawn and the jobs I am tracking are not as fast individually as the ones you have here. I'm going to fold in your cool graphical tool - that will give better insight than the textual graph I have now. Then I can try a with/without logging comparison and see the impact. If it does show a big difference, I'll have to move to an in-memory version of the same.

tro3 commented 4 years ago

No question about the logger effect on the scheduler. With the logger, there is a 25% extra time added, and some clear noise that varies from run to run. Here is the same script without the logger, then with:

Figure_1

Figure_1

I'll have to look at an in-memory implementation, if possible. Otherwise just have to live with the overhead, since it is just for tuning anyway - comparisons between strategies should still be believable.

tkf commented 4 years ago

but I am not using @spawn and the jobs I am tracking

Ah, I see. I forgot that you are controlling the scheduling so there is less to worry.

With the logger, there is a 25% extra time added, and some clear noise that varies from run to run.

Thanks for trying it out with and without the logger. Nice to see that it skews the result (nice, in the sense that my caution was meaningful).

I'll have to look at an in-memory implementation, if possible.

I guess you can do something like

struct Record
    time::UInt
    ...  # things you want to record
end

const RECORDS = Vector{Record}[]

function __init__()
    Threads.resize_nthreads!(RECORDS, Record[])
end

function pushrecord(r::Record)
    push!(RECORDS[Threads.threadid()], r)
end

? Although you can't look at the log records online. I guess sizehint!ing the Vector{Record}s can also be also useful.

tkf commented 4 years ago

BTW, making the waiter task sticky like this

@@ -11,7 +11,6 @@ function tmapreduce(f, op, xs; init, _task = nothing)
             waiter = nothing
         else
             waiter = @task schedule(_task)
-            waiter.sticky = false
         end
         tmapreduce(f, op, right; init = init, _task = waiter)
     end

"breaks" the parallelism; i.e., there only two parallel tasks even though I have four threads:

demo

Is it expected? As waiter is not doing any computation, I thought sticky task was enough.

tro3 commented 4 years ago

@tkf thanks for the in-memory tips. I put a Vector{LogItem} in the pool and it seems to work. I can just write to the file on closing the pool. I need to check the overhead, but it can’t be as bad as the Channel.

On sticky, I have no clue. It is interesting though, that you only have two “blue” tasks. I bet those are the primary thread. What I can’t explain is why all three background threads are running after that, but only two at a time.

tkf commented 4 years ago

Yeah, the blue one is the primary thread because the first items are always processed by the primary thread. It'd be nice if core devs can demystify the two-at-the-time scheduling (and why it has to be non-sticky).

tkf commented 4 years ago

Actually, I don't know why I put the waiter thing. This is enough:

function tmapreduce(f, op, xs; init, _task = nothing)
    if length(xs) <= 2
        _task === nothing || schedule(_task)
        return mapreduce(f, op, xs; init = init)
    end
    mid = length(xs) ÷ 2
    left = @view xs[1:mid]
    right = @view xs[mid+1:end]
    task = @task begin
        tmapreduce(f, op, right; init = init, _task = _task)
    end
    task.sticky = false
    return op(tmapreduce(f, op, left; init = init, _task = task), fetch(task))
end
tkf commented 4 years ago

I implemented this trick in Transducers.jl. Indeed, it makes early termination with reduce much faster. Quoting https://github.com/tkf/Transducers.jl/pull/183#issuecomment-577040873

ID time ratio memory ratio
... ... ...
["findfirst", "n=1000", "reduce", "basesize=128"] 0.48 (5%) :white_check_mark: 0.45 (1%) :white_check_mark:
["findfirst", "n=1000", "reduce", "basesize=256"] 0.70 (5%) :white_check_mark: 0.57 (1%) :white_check_mark:
["findfirst", "n=1000", "reduce", "basesize=512"] 0.45 (5%) :white_check_mark: 0.37 (1%) :white_check_mark:
["findfirst", "n=400", "reduce", "basesize=128"] 0.96 (5%) 0.93 (1%) :white_check_mark:
["findfirst", "n=400", "reduce", "basesize=256"] 0.92 (5%) :white_check_mark: 0.86 (1%) :white_check_mark:
["findfirst", "n=400", "reduce", "basesize=512"] 0.85 (5%) :white_check_mark: 0.70 (1%) :white_check_mark:
["findfirst", "n=500", "reduce", "basesize=128"] 0.58 (5%) :white_check_mark: 0.41 (1%) :white_check_mark:
["findfirst", "n=500", "reduce", "basesize=256"] 0.22 (5%) :white_check_mark: 0.17 (1%) :white_check_mark:
["findfirst", "n=500", "reduce", "basesize=512"] 0.32 (5%) :white_check_mark: 0.22 (1%) :white_check_mark:
... ... ...
tro3 commented 4 years ago

@tkf - interestingly enough, if you pull sticky=false from that last solution, it never leaves the primary. It seems like sticky stays on the same thread. In your earlier version, you made waiter sticky, but the main task was not, so you got two threads. (Each thread tried to keep one job local and spawn the other elsewhere, but sticky can't go elsewhere, so it just queued up behind the original.) If both waiter and the main task were sticky, it would stay on the primary.

Just a theory