Open ctrueden opened 5 years ago
I see mayor disadvantages with the suggested solutions:
ForkJoinPool
, it's very hard to understand how it works. But there's a trick. Use parallel streams instead. They will use ForkJoinPool
automatically.ExecutorService
has the disadvantage of not telling you how many threads there are. ExecutorService
blocks for nested parallelization.numThreads
has the disadvantage, that it doesn't allow to specify or reuse, a ExecutorService
or thread pool.ForkJoinPool
or ExecutorService
. So you alway's have the overhead of using threads.My idea is to not use numThreads
, ExecutorService
or ForkJoinPool
directly, but instead use an interface:
public interface MultiThreadSetting
{
boolean useMultiThreading(); // returns true if multi-threading should be used
int suggestNumberOfTasks(); // returns 1 for single-threading, or 4 * availableProccessors() for multi threading, you may also manually specify this value
<T> void forEach( Collection< T > values, Consumer< T > action ); // Executes the action for each value, sequentially for single-threading, in parallel for multi-threading.
}
See PR https://github.com/imglib/imglib2/pull/250
For this interface I have three implementations:
ForkJoinPool
.ExecutorService
.ForkJoinPool
or ExecutorService
.Thanks a lot @maarzt for digging in to this issue!
I am 100% on board with using our own interface. However, the tricky question is: where shall this interface live? Because I want to use it in both SciJava Common and ImgLib2. Do you have a preference? @tpietzsch what do you think? Maybe a new scijava-threads
component?
And @maarzt: have you looked in detail at the SciJava ThreadService
? How would you propose we update it to slot in nicely with this new approach?
@maarzt some quick comments
int suggestNumberOfTasks()
is not good enough.
The decision on how many subtasks to create should be made at the point where MultiThreadSetting
is used. Taking into account availableProcessors but also other things that might be only locally available. I would rather put a method int getParallelism()
like in ForkJoinPool
. (This is more or less a replacement for the availableProcessors
which might be too global.)
void forEach( Collection< T > values, Consumer< T > action );
I don't like this either. It makes cases very cumbersome where you care about the result of the action. There you have to bake both input and holder for the result into T
. I also don't get why you would force to use the same action for each value. What happens if you have different actions? I assume calling foreach(a,b); foreach(c,d);
would always block foreach(c,d)
until foreach(a,b)
has completed (I guess it would have to). So you would need to also multiplex multiple actions into action
. I like a Future<V> submit(Callable<V>)
interface much better. It would also be what is done everywhere in the JDK...
I would not use the name MultiThreadSetting
for something that is used to execute stuff. Setting
implies (at least to me) a passive, probably immutable object.
@ctrueden @axtimwalde @hanslovsky @StephanPreibisch
We discussed this and @maarzt came up with a pretty fantastic (IMO) proposal.
You can have a look at it here (multi-threading
branch).
The gist of it is that parallelization settings are available as ThreadLocal
inside algorithms without passing any ExecutorService
or similar argument in. In an algorithm you just call, for example,
Parallelization.forEach( elements, operation );
and it runs parallel or sequential according to context.
This can mean either
operation
on each element
sequentially in the current thread, oroperation
on each element
in parallel in the common ForkJoinPool
(or whichever pool you are currently in.)The decision is made by the caller on the top-level, by running
Parallelization.singleThreaded( () -> myAlgorithm() );
or
Parallelization.multiThreaded( () -> myAlgorithm() );
Note, that you would (almost?) never call this inside any algorithm. This is done outside by the user.
It ties nicely into existing stuff. For example, if you call
elements.parallelStream().forEach( operation );
then that also triggers parallelisation inside operation
.
It supports recursive tasks like ForkJoinPool.
See https://github.com/maarzt/sandbox/blob/multi-threading/src/main/java/sandbox/parallization/SumAlgorithm.java for an example the recursively splits a RAI
by dimension.
@ctrueden This brings us to a remaining question: The "user" would be for example the OpService
, CommandService
etc. If I call ops.bla()
, is that run with the callers parallelisation settings or does the OpService
have its own threads and parallelisation settings? (For OpService
it should probably run in the calling thread, but for other services it might not be so clear.
Any thoughts, comments?
Hi @maarzt & @tpietzsch,
First: thanks very much for trying. Getting abstracted parallelization right is perhaps impossible. It has, though, its uses, but for the general case, I have yet to stumble upon a good solution.
The first major issue is that algorithm have different memory requirements and different contention issues. There are of course groups of algorithms that share such properties (i.e. filtering an image), and therefore there can be well-suited solutions for them.
An example: suppose I am running a memory-heavy algorithm that uses e.g. 25% of my desktop's RAM. I can only effectively run 4 threads. But, some steps within each parallel processing branch can be done with more threads. Therefore each would use 1/4th of all available threads. When done with this subprocess, then the main, memory-heavy thread would resume (for example, moving on to the next 3D image to process in a 4D series).
An example of a subtask would be to copy a resulting image into an ArrayImg
. That's why e.g. I'd need to specify the number of threads in ImgUtil.copy
.
If the abstracted parallelization framework assumes that allocating as many free threads as possible to a task is a good idea, the above will fail, or result in suboptimal performance.
Another example: suppose there are multiple data structures that have to be accessed for every iteration in the loop. If one naively uses all available threads to process the next available loop index, contention is high: all threads try to access all data structures at once. Instead, one can chunk the data structures and have each thread process a chunk that the others never access. Is chunking included in the abstract parallelization? For some cases, it would be preferable, because it is cumbersome, and could be done automatically via e.g. Views.interval
. But in many cases it cannot.
All that said: "Those who think something can't be done should not stop those who are doing it from doing it." So carry on and let's see how far this goes. Looking forward to the results.
Specific comments on the code: statically instantiating both an ExecutorService
and a ForkJoinExecutorService
by default is not a good idea. These should be instantiated only if/when needed.
On the "problem" that an ExecutorService
doesn't tell you how many threads it is setup to use. That's only true for the excessively abstract interface ExecutorService
. Consider using directly e.g. ThreadPoolExecutor
, which is the parent class to even the ScheduledThreadPoolExecutor
. With ThreadPoolExecutor
, you have methods like getPoolSize
and others that let you find out how many Thread
s are available, and how many are actively running presently, etc.
@ctrueden This brings us to a remaining question
Let's discuss when I visit in June. Added to my list for that week.
@ctrueden
Let's discuss when I visit in June. Added to my list for that week.
Sounds good.
@acardona I agree that a general solution is basically impossible. There will always be cases where it is advisable to roll your own multithreading solution. This proposal is more about the 80% of cases where you just want to chunk your image and get on with it... :-)
Specifically, the first scenario you mentioned would be possible to do with a ForkJoinPool (resp. the proposed Parallelization
). You could just split the whole task into 4 parts and those parts could then submit many subtasks and wait for completion. It gets ugly if you do not want to simply split into 4 equal parts, for example because it cannot be predicted whether the parts would run for about the same time. Even then, it could probably be shoehorned in somehow, but it would be ugly and a special-case solution would be preferable.
Regarding the second scenario, chunking is not included in the abstract parallelization. The idea is that you say Parallelization.getParallelism()
and split accordingly into subtasks, then submit those. The problem of contention on "hot" data structures is not solved by the proposed framework. It is only about specifying which task can be run concurrently and which dependencies between tasks exist.
How the tasks are distributed onto threads is not controllable. To some extent this is even true if you implement a special case solution, unless you have full control over which machine this runs on and what other things are running at the same time. If you create 8 threads you can only hope that they will actually all be running at the same time. Anyway, the proposal offers no solution there. You could do striping of data structures as you proposed, but by task, not by thread. This will work but is probable over-cautious.
Specific comments on the code: statically instantiating both an
ExecutorService
and aForkJoinExecutorService
by default is not a good idea. These should be instantiated only if/when needed.
Hmm, why? These are two super light-weight objects. Neither the SequentialExecutorService
nor the ForkJoinExecutorService
create threads or hold references to any data. (SequentialExecutorService
just immediately runs everything in the calling thread, ForkJoinExecutorService
submits to the common pool or whatever pool the calling thread is running in.)
Many of the implementations in imglib2-algorithm have an
int numThreads
parameter for controlling how many threads to spawn. Others take anExecutorService
. Accepting anExecutorService
is more flexible and would mesh better with SciJava Common’sThreadService
and therefore ImageJ Ops. See also imagej/imagej-ops#599. On the other hand, anExecutorService
alone is not enough to identify the intended number of threads to use when partitioning the task.Note that currently, we often infer
numTasks
fromRuntime.availableProcessors()
by default, which is not a good practice in general because multiple tasks may be ongoing simultaneously, which can result in more threads than processors.@tpietzsch points out:
ForkJoinPool
there is a "parallelism level" that corresponds to this roughly. We should consider using ForkJoinPool throughout.ForkJoinPool
extendsExecutorService
, so it would at least be backwards compatible in some ways.ForkJoinPool.getParallelism()
could replace / augmentnumTasks
.ForkJoinPool
supports work-stealing which would be important if submitted task spawn new subtasks for whose completion they wait. This allows handing down pool through algorithms that parallelise in chunks and for each chunk call another algorithm that parallelizes internally. (With handing down ExecutorService that wouldn't work.)ForkJoinPool.commonPool()
that is used by streams etc. We could fall back to this there is no user-provided pool.Based on a chat in gitter.