JuliaSIMD / ThreadingUtilities.jl

Utilities for low overhead threading in Julia.
MIT License
17 stars 4 forks source link

Add some docs and/or examples? #16

Open DilumAluthge opened 3 years ago

chriselrod commented 3 years ago

Probably after fixing this issue: https://github.com/chriselrod/ThreadingUtilities.jl/issues/17

For now, my plan would be to have a few libraries like Octavian and LoopVectorization use the low level interface. The test suite has about the simplest supported example at the moment.

chriselrod commented 3 years ago

Currently, how it works is that you need a function that takes a Ptr{UInt} as it's only argument. You then need to get a Ptr to that function. To run it on worker tid, check the state of THREADPOOL[tid], making sure it is either SPINning or WAITing. If its one of those, store that pointer at pointer(THREADPOOL[tid]) + sizeof(UInt). If it was SPINning, set the state to TASK, so it knows there is more work to do. If it was WAITing, set it to LOCK and then wake it up.

The Ptr{UInt} that your function receives is this same pointer. The pointer is to a Ref{NTuple{32,UInt}}. ThreadingUtilities only uses the first 2 of 32: the 1st is the state (i.e. SPIN/WAIT/TASK/LOCK/STUP), and the second is the pointer to your function getting called. That leaves 30 remaining slots that it doesn't use, and you can store things into. This lets your function load from these to receive its real arguments. You'll have to store the arguments it needs when you store the function pointer, before setting the state to TASK or waking the thread.

chriselrod commented 3 years ago

A complicating problem is that this has to be GC friendly, meaning we need GC.@preserves in a lot of places while working with these pointers.

So to automate this, what do we need? We can get pointers to anonymous functions, so one possibility is to take arbitrary code and create an anonymous function that doesn't use the ptr:

anonfunc = _ -> begin
    # do stuff
end

We could then create a cfunc = @cfunction($anonfunc, Cvoid, (Ptr{UInt},)), and GC.@preserve it while Base.unsafe_convert(Ptr{Cvoid}, cfunc) for our Ptr.

This will allocate memory in general, so we could get fancier and try to turn any arrays into stridedpointers, and then GC.@preserve those arrays as well.

At this point, I'm inclined to just do this as part of the LoopVectorization.@avx macro, as it will support everything needed to automate it.

DilumAluthge commented 3 years ago

I agree that having this functionality in the @avx macro will be nice.

However, I do think it would be good to also have a high level interface for using this package in settings that do not involve loops.

Here's a sketch of what I'm envisioning. It intentionally tries to mimic the user interface of the built in threading.

So I think it would be nice for ThreadingUtilities to export a macro @spawnfn (for "spawn function") that does all this work for you. Example usage might be:

@spawnfn foo(ptr)

@spawnfn will:

  1. Check that the expression passed to it is a call expresssion
  2. Check that the call has exactly zero keyword arguments
  3. Check that the call has exactly one positional argument
  4. Do all the other stuff you describe in your previous comments, with the end result being that the function is sent to a thread (any thread) to run. Maybe

And of course we don't want this to block. That is, if I have this:

@spawnfn foo(ptr1)
@spawnfn foo(ptr2)
@spawnfn foo(ptr3)
@spawnfn foo(ptr4)

Each of these lines would return immediately.

Actually, it probably makes sense for @spawnfn to return something that we can wait on. So e.g. I could do this:

thing1 = @spawnfn foo(ptr1)
thing2 = @spawnfn foo(ptr2)
thing3 = @spawnfn foo(ptr3)
thing4 = @spawnfn foo(ptr4)

wait(thing1)
wait(thing2)
wait(thing3)
wait(thing4)

So we'd spawn all four functions at once, send them off to execute, and then wait until all four have completed.

One note: if necessary, we can tell users that they need to do GC.@preserve ptr (or whatever they need to do) inside the body of their function. I'm fine asking them to do that, since it's relatively easy. But I'd like the @spawnfn macro to do pretty much everything else.

Would this kind of high level convenience interface be possible? Then users would be able to use ThreadingUtilities.jl really easily.

chriselrod commented 3 years ago

I think it might work if the thing @spawnfn returns what needs to be GC.@preserved, and then let the wait function do the preserving.

DilumAluthge commented 3 years ago

The follow-up question: if that's not possible, what is the simplest high-level convenience API that is possible for this package?

DilumAluthge commented 3 years ago

Does ThreadingUtilities require that you tell it which thread to run the function on?

DilumAluthge commented 3 years ago

Because it would be nice to support "please run this on any thread other than thread 1; I don't care which thread".

Of course, we should probably also support "please run this on a specific thread number".

DilumAluthge commented 3 years ago

I think it might work if the thing @spawnfn returns what needs to be GC.@preserved, and then let the wait function do the preserving.

Instead of returning the thing that needs to be preserved itself, can we return a struct that wraps it?

I.e. if x needs to be preserved, can we return ThreadingUtilities.PreserveWrapper(x) where we define:

struct PreserveWrapper{T}
    x::T
end

Then we define the method Base.wait(w::PreserveWrapper{T}) where {T}, and inside that method we do the preserving.

This lets us use the Base wait function without needing to commit type piracy.

chriselrod commented 3 years ago

Does ThreadingUtilities require that you tell it which thread to run the function on?

It could loop through them until it finds one that is either WAIT or SPIN.

chriselrod commented 3 years ago

Then we define the method Base.wait(w::PreserveWrapper{T}) where {T}, and inside that method we do the preserving.

Yes, that's what I meant. It'd also need to carry the id to wait on.

DilumAluthge commented 3 years ago

Does ThreadingUtilities require that you tell it which thread to run the function on?

It could loop through them until it finds one that is either WAIT or SPIN.

Perfect. So @spawnfn will do that. And then we can also have @spawnon tid f(ptr) which will run on thread number tid.

chriselrod commented 3 years ago

I'll probably work on this fairly soon. Turns out only @threads sets the threading region, meaning all the checks in Octavian and LoopVectorization.vmapt! don't actually work.

While I think you can safely nest @spawn inside ThreadingUtilities threads, the other way around doesn't work. And the other way around would be more important, since it's the lower overhead threads you'd prefer to be nested inside.

So I'll add some convenient functions that make it a little easier to use these in outer loops, and also a means of checking which tasks are available to spawn on. I'd like some sort of breadth-first thing to work. E.g., maybe someone has 16 cores and wants to fit 4 chains of MCMC. Then they should be able to do these in parallel, and give each chain 4 cores/threads to run on.

Roger-luo commented 3 years ago

I'm trying to use this for the subspace_mul! since I find Threads.@threads somehow slows down the original @avx implementation 4 times on my machine. I'm trying to understand what does the launch function do, but having some trouble, I'm wondering if it stores the function pointer to the thread and somehow load that function? But what would be the requirement of the input closure/function?

chriselrod commented 3 years ago

I'm trying to use this for the subspace_mul! since I find Threads.@threads somehow slows down the original @avx implementation 4 times on my machine.

https://github.com/JuliaSIMD/LoopVectorization.jl/issues/221

I'll try and get back to implementing Threads.@threads alternative in CheapThreads.jl this weekend.

I'm trying to understand what does the launch function do, but having some trouble, I'm wondering if it stores the function pointer to the thread and somehow load that function? But what would be the requirement of the input closure/function?

When you using ThreadingUtilities, the __init__() function starts tasks pinned to each thread running this loop. When launch sets their associated pointer to TASK, they will will call _call, which loads the stored function pointer and ccalls it, passing the pointer.

So the requirements on the function f passed to launch is that it

  1. Stores the function pointer you want _call to ccall at offset p + sizeof(UInt).
  2. Stores any arguments you want this function pointer to have access to, starting at offset p + 2sizeof(UInt). The total buffer size is ThreadingUtilities.THREADBUFFERSIZE bytes, currently 512. Meaning you can store up to 512 - 2sizeof(UInt) bytes. Best to pass large amounts of data by reference. The staticarrays test, for example, shows passing SVectors by pointer.

The requirements on the function pointer are that it returns nothing and takes a single argument, a Ptr{UInt} (the pointer you just stored to). It should load all the data it needs from that Ptr, and then call the actual function you want to be calling. It can store returns to that pointer which you load.

Be careful to GC.@preserve everything that needs protecting from the GC.

Roger-luo commented 3 years ago

Stores any arguments you want this function pointer to have access to, starting at offset p + 2sizeof(UInt). The total buffer size is ThreadingUtilities.THREADBUFFERSIZE bytes, currently 512. Meaning you can store up to 512 - 2sizeof(UInt) bytes. Best to pass large amounts of data by reference. The staticarrays test, for example, shows passing SVectors by pointer.

from what I understand, the function implemented using MulStaticArray is the function gets executed by each thread? and all the actual variables needs to be stored into the thread pointer in terms of pointers? For the static array case, I think because the shape information is in the type, so you only need to pass the pointer to the thread, but for a Matrix type, I will need to separately pass the shape and each pointer to the matrix memory to thread pointer to use them later?


Is there a way to get the current thread id inside the thread function, it seems the static array test doesn't use the thread id inside MulStaticArray function.

chriselrod commented 3 years ago

from what I understand, the function implemented using MulStaticArray is the function gets executed by each thread?

Yes. Parametric functors are often a convenient way of specifying some aspects of behavior. In that example, P is Tuple{Ptr{SVector{16,Float64}},Ptr{SVector{16,Float64}}}. ThreadingUtilities.load(ptr, P, offset) then loads an object of type P from ptr.

For the static array case, I think because the shape information is in the type, so you only need to pass the pointer to the thread, but for a Matrix type, I will need to separately pass the shape and each pointer to the matrix memory to thread pointer to use them later?

I added an example using SMatrix instead of SVector to the tests.

Basically, making P a Tuple{Ptr{SMatrix{4,5,Float64,20}},Ptr{SMatrix{4,5,Float64,20}}} instead is all it takes to load/store an SMatrix. You don't have to pass the type as one Tuple. You could instead make multiple calls, e.g.

offset, (ptry,ptrx) = ThreadingUtilities.load(p, A, 2*sizeof(UInt))
_, (ptry,ptrx) = ThreadingUtilities.load(p, B, offset)

where P === Tuple{A,B}. The important thing is that the second argument to load determines the type it loads.

Is there a way to get the current thread id inside the thread function, it seems the static array test doesn't use the thread id inside MulStaticArray function.

Threads.threadid()-1. You can also manually pass in any information through the pointer, e.g. in the setup function

function setup_mul_svector!(p, tid, y::Base.RefValue{T}, x::Base.RefValue{T}) where {T}
    py = Base.unsafe_convert(Ptr{T}, y)
    px = Base.unsafe_convert(Ptr{T}, x)
    fptr = mul_staticarray_ptr(py, px)
    offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt))
    offset = ThreadingUtilities.store!(p, (py,px), offset)
    ThreadingUtilities.store!(p, tid, offset)
    nothing
end
@inline function launch_thread_mul_svector(tid, y, x)
    ThreadingUtilities.launch(tid, tid, y, x) do p, tid, y, x
        setup_mul_svector!(p, tid, y, x)
    end
end

and then

function (::MulStaticArray{P})(p::Ptr{UInt}) where {P}
    offset, (ptry,ptrx) = ThreadingUtilities.load(p, P, 2*sizeof(UInt))
    _, tid = ThreadingUtilities.load(p, Int, offset)
    unsafe_store!(ptry, unsafe_load(ptrx) * 2.7)
    nothing
end