JuliaFolds2 / OhMyThreads.jl

Simple multithreading in julia
https://juliafolds2.github.io/OhMyThreads.jl/
MIT License
130 stars 8 forks source link

Docs: explainer for translating loops to reductions #35

Open MasonProtter opened 7 months ago

MasonProtter commented 7 months ago

Reductions are scary and confusing to beginners, but with a quick explainer, they're actually not that bad. It'd be good to have a docs section that explains the following:

Some general sequential for loop (without early termination or anything special like that) can be thought of as writing

state = some_initial_value
for x in itr
    state = some_function(state, x)
end
return state

this can be straightforwardly translated to foldl as foldl(some_function, itr; initi=some_initial_value), or if we want to make it look more like the for loop, we can write:

foldl(itr; init=some_initial_value) do state, x
    some_function(state, x)
end

This foldl is much easier to parallelize than the for loop because there is a very structured way in which state is passed from one iteration to the next.

The state is not some non-local variable that was initialized outside of the loop and then implicitly modified, but instead, a foldl operates on a two-argument function that takes the previous state as the first argument, and an element of the collection as the second argument, and computes some modified state from that.

There are then two conditions for a foldl to be safely turned into a treduce: 1) some_func must be associative, meaning some_func(x, some_func(y, z)) == some_func(some_func(x, y), z). This is important because for the reduction to be parallelizable, you need to be able to re-partition and change the order in which you traverse the data. 2) the init (if provided) should be a "neutral element" that doesn't change the answer if it's re-initialized. That is, if you write treduce(op, itr; init), it must be legal for me to re-write it like op(treduce(op, itr[begin:end÷2]; init), treduce(op, itr[end÷2+1:end]; init).


The function tmapreduce (and treducemap) take an additional argument: a unary function f which is applied to each element of the iterator exactly once (unlike the reducing function). So if you write

tmapreduce(f, op, itr; init)

that becomes

state = init
for x in itr
    state = op(state, f(x))
end
return state

The function tforeach (and also the Base.Threads.@threads macro) is a special case where there is no explicit state being passed and updated from loop iteration to loop iteration, you just execute some function that produces some side effects (like printing or if you're feeling risky, mutating an array).

Hence, writing tforeach(f, itr; init) is actually just implemented as

tforeach(f, itr; kwargs...) = treducemap(f, (state, _) -> state, itr; kwargs..., init=nothing)
carstenbauer commented 6 months ago

I like things like this. I was thinking of having a "Explanations" section in the docs that, without being a tutorial or reference guide, give a bit of background information etc. I would also copy over some notes from ThreadPinning.jl, e.g. about the interaction between BLAS and Julia threads and the (potential) importance of pinning threads and so on. Such that the OhMyThreads docs become just a generally useful resource about Multithreading in Julia that you can point people to.

ctkelley commented 6 months ago

This is a very good idea. It would help the novices if you could give a few concrete and simple examples. For example, can you express a matrix-vector product as a reduction?

carstenbauer commented 6 months ago

For example, can you express a matrix-vector product as a reduction?

FWIW, https://gist.github.com/carstenbauer/7f8147c5b0a469ef1f86904fd77cc5a9

TLDR:

# sequential (respect column-major order)
function matvec_col!(y, A, x)
    fill!(y, zero(length(y)))
    for c in axes(A, 2)
        for r in axes(A, 1)
            @inbounds y[r] += A[r, c] * x[c]
        end
    end
    return y
end
matvec_col(A, x) = matvec_col!(similar(x, size(A, 1)), A, x)

# parallel reduction (macro API)
function matvec_col_attasks_chunks(A, x; ntasks = nthreads())
    @tasks for c_idcs in chunks(axes(A, 2); n = ntasks)
        @set chunking = false
        @set reducer = +
        y_local = @views matvec_col(A[:, c_idcs], x[c_idcs])
    end
end

# parallel reduction (function API)
function matvec_col_tmapreduce_chunks(A, x; ntasks = nthreads())
    tmapreduce(+, chunks(axes(A, 2); n = ntasks); chunking = false) do c_idcs
        y_local = @views matvec_col(A[:, c_idcs], x[c_idcs])
    end
end

Timings (for me):

# N = 10_000
# nthreads() = 10
# matvec_col
#   53.024 ms (2 allocations: 78.17 KiB)
# matvec_col_attasks_chunks
#   19.793 ms (109 allocations: 1.46 MiB)
# matvec_col_tmapreduce_chunks
#   19.601 ms (109 allocations: 1.46 MiB)

# N = 1000
# nthreads() = 10
# matvec_col
#   115.778 μs (1 allocation: 7.94 KiB)
# matvec_col_attasks_chunks
#   36.344 μs (90 allocations: 156.91 KiB)
# matvec_col_tmapreduce_chunks
#   36.967 μs (90 allocations: 156.91 KiB)