chapel-lang / chapel

a Productive Parallel Programming Language
https://chapel-lang.org
Other
1.79k stars 420 forks source link

Add a WorkQueue Abstraction using 'Collections' API #10485

Open LouisJenkinsCS opened 6 years ago

LouisJenkinsCS commented 6 years ago

As per issue #10472 made by @ian-bertolacci, there is a desire for an explicit work queue in Chapel, but currently Collections can suffice for the time being. In the below example, any 'collection' can actually be used due to Chapel's rich generic system. The example shows that we can alias some kind of privatized type, like ReplicatedVar, to let the user send a termination signal. The iterator WorkQueue is an exploit of current parallel iterators to allow easy yielding of elements even though it technically can loop forever if work is continually added, but it should be a good prototype. I have not tested nor profiled the below, and I don't have time right now to go further, but possibly it can serve as an initial starting point.

use ReplicatedVar;
use DistributedBag;

type KillSignal = [rcDomain] bool;

inline proc kill(sig : KillSignal) {
   rcReplicate(sig, true);
}

iter WorkQueue(collection, keepAlive) { halt("No serial work queue iterator..."); }

iter WorkQueue(collection, killSignal, param tag : iterKind) where tag == iterKind.standalone {
   coforall loc in Locales do on loc {
      coforall tid in 1..here.maxTaskPar {
         while !rcLocal(killSignal) {
            // Collection.remove() returns (hasWork, work) pairs
            var (hasWork, work) = collection.remove();
            if hasWork then yield work;
            else chpl_task_yield();
         }
      }
   }
}

var bag = new DistBag(int);
var killSignal : KillSignal;
bag.addBulk(1..10);
forall work in WorkQueue(bag, killSignal) {
   writeln(work);
   if work == 10 then bag.addBulk(11..100);
   if work == 100 then kill(killSignal);
}
writeln("Done");

TIO

LouisJenkinsCS commented 6 years ago

For detecting when the queue is empty automatically after all elements have been processed, we can use something like termination detection + distributed reductions via privatization. I already have an abstraction for this that performs increments to local counters rather than Network Atomic RDMA to a single locale so it should be extremely fast. The only other issue would be incrementing the counters automatically when the user adds to the collection; I guess the user can be required to do so, and I already have a privatized structure that handles burdens of communication. For example, the user-provided loop would change to...

var bag = new DistBag(int);
var terminationDetector = new TerminationDetector();
var killSignal : KillSignal;
bag.addBulk(1..10);
terminationDetector.started(10);
forall work in WorkQueue(bag, killSignal) {
   writeln(work);
   if work == 10 { 
      terminationDetector.started(90);
      bag.addBulk(11..100);
   }
   if work == 100 then kill(killSignal);
   terminationDetector.finished();
}

Where the WorkQueue could take and use the termination detector to determine when it should die.

LouisJenkinsCS commented 6 years ago

@mppf Could you assign this to me so I can get to this once I actually do get the chance. I'll add it to my TODO list unless someone else does, maybe as part of the Collections overhaul

mppf commented 6 years ago

@LouisJenkinsCS - I've assigned it to you but I'm not sure if the DistBag - based WorkQueue will meet @ian-bertolacci 's needs. That's something I think it would be worthwhile for the two of you to talk about, at least a little bit.

LouisJenkinsCS commented 6 years ago

So from what I can tell: The work queue requires the ability to send work to other locales, which can be performed via on statement but can be aggregated once aggregation library is released. Work stealing can be disabled in a hackish way but he did say that it works fine. Besides providing the aggregation library (which I'm still waiting on information release on, and I'm writing a paper about it right now), I need to know what else is wrong with it.

ian-bertolacci commented 6 years ago

So I have a very early implementation that wraps all of this into its own class. Its looking like it might solve both the parallel and distributed queue's (if overhead on single-locale is not too bad).

I think I'm the only most immediate 'need-er' of a work queue, so lets do this: I will work on my queue for toposort, so that we aren't trying to solve a more general problem. If we decide that it's design is more generally useful, we can cannibalize it into something more package-y then.

LouisJenkinsCS commented 6 years ago

Okay, but I warn you that wrapping the DistributedBag in a class will negate the actual benefits you obtain via privatization if you are accessing it as a class field from other locales. There is a reason why it is passed around as a record (record that forwards to privatized instance). Let me know if you have any problems.

mppf commented 6 years ago

Sounds like @ian-bertolacci is trying for a quick solution specific to his problem. I hope that at least this issue is eventually updated with a link to whatever he comes up with, so we can talk about it part of the library / figuring out what the library would look like.