chapel-lang / chapel

a Productive Parallel Programming Language
https://chapel-lang.org
Other
1.78k stars 419 forks source link

Proposal: Aggregation and buffering of asynchronous remote tasks via `begin on` #9727

Closed LouisJenkinsCS closed 6 years ago

LouisJenkinsCS commented 6 years ago

An additional and possibly welcomed addition would be enabling the user to buffer multiple remote tasks. Unlike a normal on statement where the invoking task must await the completion of the remote task, a begin on statement is a way to send a "fire-and-forget" kind of active message. The original task that spawns the remote task does not directly wait for the remote task, so there is no real need to handle processing immediately, and for reasons of performance this can be buffered.

Buffering can improve the more regular-access and bulk-synchronous applications, and even more so for irregular-access applications like graphs. An example of what I see as ideal would be as such:

var x : atomic int;
var y : atomic int;
on Locales[1] {
   forall i in 1 .. 100 {
      begin on Locales[0] {
         x.fetchAdd(i);
         y.fetchAdd(i);
      }
   }
   chpl_flushAsync();
}

What I would expect is that the begin on statement would be performed as batches of tasks. I expect that instead of having 100 individual remote tasks to spawn on Locale 0, that instead it will buffer and send all 100 tasks at once. This will significantly reduce overall communication. The method chpl_flushAsync() can be a runtime function that flushes the buffer of any messages that need to be sent.

This can be implemented in the communication layer as a sequence of individual executeOn and fastExecuteOn messages; uGNI has a limit of 1MB per Active Message, I believe it would be possible to squeeze a sequence of smaller active messages into a larger active message to be handled on the receiving node. On the receiving node it should be able to create tasks for these active messages by just submitting them to the tasking layer, similar to how a coforall would. This is of course a naive way to do it, but I believe it would be immediately effective and possible to squeeze into the next release.

Pinging @marcinz - Supervisor Pinging @mppf and @bradcray - Potential language design and feature request needed for performance reasons. Pinging @gbtitus - Would it be possible to implement something like this easily in the communication layers?

Edit:

Adding a more irregular example where buffering would be required for good performance.

var arr1 : [someDistribution] int;
var arr2 : [someOtherDistribution] int;
proc f(i,j) {
  begin on arr1[i] do somethingTo(arr1[i]);
  begin on arr2[j] do somethingTo(arr2[j]);
}

forall i in 1..1024 {
   forall j in 1 .. 2048 {
      f(i,j);
   }
}
mppf commented 6 years ago

I have two questions:

  1. How would we distinguish between a begin on that works as it does now (i.e. unbuffered) and the new buffered ones?
  2. Have you given any thought to how we would handle a similar pattern but where the body of the begin on takes a lock and does some data structure manipulation and then releases the lock? Ideally we'd want processing the buffered work to happen without taking the lock per begin on.
gbtitus commented 6 years ago

Pinging @ronawho , who is already thinking about a bulk-spawn capability.

Yes, it would be easy to have a single AM start multiple remote tasks, keeping in mind of course that the limiting factor if one spawns 100 tasks on a single remote node with (for example) 24 cores will be the task execution there, not the communication. The situation @ronawho has been specifically looking at is the one for coforall loc in Locales do on loc { ... }, where we need to fire 1 task on each of the remote locales. At scale (1000s of nodes for example), this could produce a measurable difference on some applications.

LouisJenkinsCS commented 6 years ago

How would we distinguish between a begin on that works as it does now (i.e. unbuffered) and the new buffered ones?

For now, possibly by using a pragma...

pragma "aggregate"
begin on Locales[0] do something();

Although I'm not entirely sure if I can see why you would not desire this behavior by default, assuming that the buffers themselves do get flush periodically based on time. This is a begin on here, so the user can't expect that it will complete with any determinism without using some kind of barrier or another way of notifying that it has finished.

Have you given any thought to how we would handle a similar pattern but where the body of the begin on takes a lock and does some data structure manipulation and then releases the lock? Ideally we'd want processing the buffered work to happen without taking the lock per begin on.

I believe that for the short-term implementation, it shouldn't attempt to coalesce operations just yet. Just the reduction in communication is enough of an improvement for the initial implementation. I'll let @marcinz weigh in once he isn't too busy.

mppf commented 6 years ago

It seems to me that it would be correct for begin on to be buffered always, provided that we "flush" the buffer on a memory fence. E.g.

var x$: sync int;
begin on Locales[1] {
   ... do something in the task ...
  x$ = 1;
}
x$; // intended to wait for the something in the task to finish

However it would have wildly different performance characteristics.

gbtitus commented 6 years ago

Just a nit, but don't you want a coforall instead of a forall? (To create a specific number of tasks.)

Also, wouldn't this do what you want, creating 100 tasks on Locale[0] with one communication from Locale[1]? Or am I missing something?

var x : atomic int;
var y : atomic int;
on Locales[1] {
  on Locales[0] {
    coforall i in 1 .. 100 {
      begin {
        x.fetchAdd(i);
        y.fetchAdd(i);
      }
    }
  }
  chpl_flushAsync();
}
mppf commented 6 years ago

I also wonder if introducing another data type would allow easy description of the aggregation / coalescing / buffering of operations? Maybe something conceptually similar to making a user-defined reduction?

Here is a sketch of the user-defined reduction angle with definitely wrong syntax:

var x : atomic int;
var y : atomic int;
class MyReduction {
  proc accumulate(x:int) {
     ... do something intelligent with x and y and buffering
  }
  proc combine(x:int) { ... }
}
var myReduction = new MyReduction(...);
on Locales[1] {
   forall i in 1 .. 100 with (myReduction reduce)  {
         myReduction reduce= i;
   }
}
LouisJenkinsCS commented 6 years ago

It seems to me that it would be correct for begin on to be buffered always, provided that we "flush" the buffer on a memory fence. E.g.

I'm not sure if it should be performed implicitly at memory fences. Flushing the buffer, I assume, is performed for the entire node, so having it performed on a timer and based on whether the buffer is full may be good enough. Unless each thread gets its own buffer, which could be fine if the compiler can determine that there is a dependency between the original task and the remote task, like a sync or atomic variable.

mppf commented 6 years ago

It seems to me that it would be correct for begin on to be buffered always, provided that we "flush" the buffer on a memory fence. E.g.

I'm not sure if it should be performed implicitly at memory fences. Flushing the buffer, I assume, is performed for the entire node... Unless each thread gets its own buffer...

If it's built like --cache-remote or on top of it, it has 1 buffer per system thread (vs locale or Chapel task).

LouisJenkinsCS commented 6 years ago

Just a nit, but don't you want a coforall instead of a forall? (To create a specific number of tasks.)

If the compiler could infer that a coforall only spawns those 100 remote tasks and does nothing else. If there was code before and/or after the begin on, then it would just end up spawning 100 local tasks that spawn a remote task each with some other minor computation. A forall would be more efficient in that it would limit the number of local tasks spawning other remote tasks.

Also, wouldn't this do what you want, creating 100 tasks on Locale[0] with one communication from Locale[1]? Or am I missing something?

Its more of a simplified example, you can do it that way but the example isn't meant to be well optimized, or even to an example of real code. In reality there isn't a need to spawn Locale[1] either since it just spawns a remote task back on Locale[0], but in this example there is.

gbtitus commented 6 years ago

Also, wouldn't this do what you want, creating 100 tasks on Locale[0] with one communication from Locale[1]? ...

Its more of a simplified example, .... In reality there isn't a need to spawn Locale[1] either since it just spawns a remote task back on Locale[0], but in this example there is.

Sure, understood about the Locale[1] part. What I was asking was, if the goal is to create many remote tasks with one communication, why isn't this sufficient?

on ... {
  for i in 1 .. numTasks {    // could use a forall-stmt, if _lots_ of tasks
    begin { ... } 
  }
}

I'm not getting what the buffering you're proposing would achieve, that the above code doesn't.

LouisJenkinsCS commented 6 years ago

I also wonder if introducing another data type would allow easy description of the aggregation / coalescing / buffering of operations? Maybe something conceptually similar to making a user-defined reduction?

The thing is, I'm not sure... it does seem like a step in the right direction, but I'm not sure if it will be applicable for non bulk-synchronous operations where we cannot determine what to reduce immediately.

LouisJenkinsCS commented 6 years ago

Sure, understood about the Locale[1] part. What I was asking was, if the goal is to create many remote tasks with one communication, why isn't this sufficient?

I suppose my example was a bit too naive. Here is another one...

var arr1 : [someDistribution] int;
var arr2 : [someOtherDistribution] int;
proc f(i,j) {
  begin on arr1[i] do somethingTo(arr1[i]);
  begin on arr2[j] do somethingTo(arr2[j]);
}

forall i in 1..1024 {
   forall j in 1 .. 2048 {
      f(i,j);
   }
}

In this case, we cannot determine what locale to run this on, and assume that someDistribution and someOtherDistribution are user-defined and unknown. In this case, each call to f can have on statements to two separate locales, in which case should be buffered. I believe I should have made this my original example to avoid confusion.

ronawho commented 6 years ago

This sounds like a good idea. In terms of design, the biggest concern for me is when would you flush. If you go by time you need a background thread to periodically perform flushes (and then you need to be careful about preventing interference with other threads.). We'd probably also want some manual way for users to trigger the flushing since any time-based flush won't be right for all applications. I also don't think you'd want to buffer by default, I think that should be opt-in.

And FWIW I think there is a lot of implementation work here across the compiler, modules, and communication layer.

LouisJenkinsCS commented 6 years ago

Is there anything I can do assist in the development of this change? (Honestly I'd welcome the challenge and would love to get away from the module-level code for a bit)

LouisJenkinsCS commented 6 years ago

If you go by time you need a background thread to periodically perform flushes (and then you need to be careful about preventing interference with other threads.).

Use the polling task, or any task spin-waiting and doing nothing, waiting on communications.

We'd probably also want some manual way for users to trigger the flushing since any time-based flush won't be right for all applications.

Definitely, chpl_flushAsync() would do precisely this (although whether it is for the thread or entire locale is up to debate, although likely the former since we already have this for --cache-remote as @mppf said).

mppf commented 6 years ago

I also wonder if introducing another data type would allow easy description of the aggregation / coalescing / buffering of operations? Maybe something conceptually similar to making a user-defined reduction?

The thing is, I'm not sure... it does seem like a step in the right direction, but I'm not sure if it will be applicable for non bulk-synchronous operations where we cannot determine what to reduce immediately.

Do you think you could come up with an example to demonstrate the issue? an example where we can't determine what to reduce beforehand?

LouisJenkinsCS commented 6 years ago

Do you think you could come up with an example to demonstrate the issue? an example where we can't determine what to reduce beforehand?

Perhaps I misinterpreted your example, but in that case the reduction requires a loop, right? In the newer example, the reduction couldn't work here (imagine that f is some library function which shouldn't expose implementation details, so the user, the one invoking the loop, won't know how what reduction to use). However if you mean having a new construct which is applied to each specific begin on , not necessarily requiring a loop of any kind, then I'd agree.

I'm thinking perhaps there could be some kind standard module that keeps track of destination buffers on each locale (hence privatized), one buffer for each potential target locale. Then instead of invoking begin on you'd call something like DestinationBuffer.produce(targetLocale, args...). Once the DestinationBuffer is full for a given locale, it can consume the buffer on the target locale (in fact, that is how I am attempting to handle it on my own library). This module would allow application specific ways of producing data meant for a locale, a way to coalesce what has been produced, and later consume the coalesced data on the target locale.

This does not mean that begin on shouldn't be buffered, however.

gbtitus commented 6 years ago

... Here is another [example, where] we cannot determine what locale to run [...] on ...

Ah, I've got it now, thanks. And we even have application/benchmark code that looks a lot like that, namely this in ra.chpl:

if (useOn) then
  forall (_, r) in zip(Updates, RAStream()) do
    on TableDist.idxToLocale[r & indexMask] do {
      const myR = r;
      local {
        T[myR & indexMask] ^= myR;
      }
    }
else
  ...

This is what is run for our 'RA-on' performance results. As can be seen here, this lags the remote-memory and network-atomic versions by a factor of 10-15 on XC-16. Improving that would be nice.

mppf commented 6 years ago

Taking that RA example, couldn't that be written with a custom reduction?

This is a sketch (and again I'm sure the syntax isn't quite right):

  record RaHandler {
    // describes what locale a piece of input should run on
    proc locale(input:uint) {
      return TableDist.idxToLocale[r & indexMask] 
    }
    // always runs wherever locale(input) is
    proc updateThere(input:uint) {
      T[myR & indexMask] ^= myR;
    }
  }

  var aggregator = new Aggregator( new RaHandler() );
  forall (_, r) in zip(Updates, RAStream()) with (reduce aggregator) {
    aggregator reduce= r;
    // on the above line, the system would take the following steps:
    //   - call the task-local accumulate function on the custom reduction Aggregator
    //   - combine these task-local updates eventually into buffers of updates that are
    //      sent to other locales
    //   - process these updates eventually on the other locales according to RaHandler
  }

How would this work for the other example? The other example wasn't concrete enough for me, so I've tried here to transmogrify it into a Graph construction:

var graph: Graph; // imagine a distributed graph data type

// Now suppose that graph.addEdge() might apply
// something to two different locales, say one locale
// owns the from node and the other owns the to node,
// and we want to update both.
// Imagine that this is in a library somewhere.
proc Graph.addEdge(i,j) {
   on FromNodesLists[i] {
     FromNodesLists[i].addForwardEdge(i, j);
   }
   on ToNodesLists[i] {
     ToNodesLists[i].addReverseEdge(i, j);
   }
}

forall (i,j) in {1..1024, 1 .. 2048} {
      graph.addEdge(i,j);
   }
}

Now, how would this be written in the custom-reduction style I have been bringing up?

var graph: Graph;

var graphBuilder = graph.builder();
// What is graph.builder()? The idea is that the Graph type author
// would provide a custom reduction that aggregates etc. appropriately.
// It updates the graph by the end of the reduction, but can buffer things
// up during the computation. It could be built out of a library component
// such as Aggregator above.
forall (i,j) in {1..1024, 1..2048} with (reduce graphBuilder) {
  graphBuilder reduce= (i,j);
}

I'm pretty sure this strategy can allow writing the computational / communication pattern desired. There are two questions I see:

  1. Is it too weird / unnatural to expect users of a library to write this kind of pattern in this way?
  2. Are there important computation / communication patterns that it can't address?
LouisJenkinsCS commented 6 years ago

So one thing that will be coming up soon is a much more communication-heavy computation that cannot be whisked away by an aggregate reduction. For example, computing S-Metrics of a hypergraph. In the case of massively large hypergraphs, the adjacency list for vertices and hyperedges can themselves be distributed. For example...

record AdjacencyList {
   var distDom;
   type neighborType;
   var neighbors : [distDom] neighborType;
}

Where, as before, both vertices and edges can have their own distribution...

var vertices : [someDistribution] AdjacencyList;
var hyperedges : [someOtherDistribution] AdjacencyList;

In the above case, it gets a lot more difficult... "Why would you want a distribution with another distribution inside of it?", its possible we'll have billions perhaps trillions of vertices and hyperedges, where the neighborList itself could be in the size of millions or billions or even trillions itself.

Following from this, we want to compute S-Metrics (first time I'm being introduced to this concept, but maybe @marcinz can correct me if I am wrong), where we need to be able to determine whether two hyperedges h1 and h2 are "q-intersecting" (If h1.neighborList.intersection(h2).size == q); this is needed so we can perform "s-walking" where we travel from h1 to h2. My assumption is that it would go something like...

// Could buffer into (hyperedges.size / BUFFER_SIZE) packets
forall h1 in hyperedges {
   begin on h1 {
      // Could buffer into (h1.neighborList.size / BUFFER_SIZE) packets
      forall n in h1.neighborList{
         begin on n {
            // Could buffer into (n.neighborList.size / BUFFER_SIZE) packets 
            forall h2 in n.neighborList{
               if h1.neighborList.intersection(h2).size == q then yield (h1,h2);
            }
         }
      }
   }
}

In this case, since we can have a ton of hyperedges (again potentially trillions), a lot of these communications should be buffered. Of course to determine when everything finishes, I can have a barrier in there somewhere to wait on. While the reduction could work, its super awkward in this case. Also of course likely there is a much more efficient algorithm, but I'm not sure since currently is implemented using an adjacency list (The reason its not a 2D graph is that I need to be able to append new vertices/hyperedges to neighborLists without locking down the entire data structure).

bradcray commented 6 years ago

I'm catching up on this thread after being out M-T and understand the general desire for this feature, but would like to have a more concrete case to work on before we go too far with something requiring major changes to the language, compiler, and runtime. Specifically, while I know that coming from an MPI world, there's an assumption that everything must be buffered in order to get good performance, in a sufficiently asynchronous world, latency hiding can move the bottleneck from communication to something like cores or memory, as Greg alludes to above. Specifically, once we had the ability to buffer up 100 tasks for locale 1, each of which does one op, I suspect that the next thing we'd want to do is do all those ops with one task rather than 100. While I'm not trying to suggest that there aren't problems that will require some sort of buffering in Chapel, I'd like to have a crunchy example of such a problem to dig into rather than speculating as to what it might look like and engineering based on that assumption. Ideally, we would be able to write versions of that crunchy problem with and without doing manual buffering within Chapel itself to demonstrate the potential win before going too far down the path of modifying the runtime, compiler, and modules. I'll also mention that this relates not just to @ronawho's interest in vectorized task spawning, but also to some other computational patterns he's started studying that do updates like this with and without (manual) buffering.

@LouisJenkinsCS, in your last example, I feel confused about the following idiom:

// Could buffer into (hyperedges.size / BUFFER_SIZE) packets
forall h1 in hyperedges {
   begin on h1 {
      // Could buffer into (h1.neighborList.size / BUFFER_SIZE) packets
      forall n in h1.neighborList{
         begin on n {

Specifically, if hyperedges is distributed as in your declaration, the forall loop would automatically create remote tasks on the remote locales owning chunks of that locale's local portion of the array, so the begin + on seems unneccessarily heavyweight (i.e., the task owning an iteration h1 will already be on h1 and will already own a coarse chunk of hyperedges along with other sibling tasks running on the other cores. So it seems to me that this is injecting O(|hyperedges|) tasks and on-clauses where the forall loop itself will introduce the more optimal O(|hyperedges.targetLocales|) on-clauses and O(|hyperedges.targetLocales|*numCoresPerLocale) tasks (and ditto for the neighborList). So what is lost if we simply wrote this as:

forall h1 in hyperedges {
  forall n in h1.neighborList{

?

Thanks

bradcray commented 6 years ago

Thinking about this issue a bit more on the way to work this morning, and specifically the original example, whose operations mapped well to atomic network operations, I wanted to point to issue #9652, in which @ronawho is exploring how we can expose nonblocking atomic operations to Chapel programmers. I realize that the examples on this issue have stopped using atomic operations since that first example, but wanted to point out that for things that do map to network atomics, I think we'd probably prefer to express them as atomic operations rather than on-clauses + begins, at least in the near-term (that is, until the compiler can optimize away begin + on combinations for cases where it thinks it will hurt rather than help performance).

All that said, I think even such atomic operations can benefit from being batched up (which I anticipate will be explored as follow-ons to #9652), but I suspect we'll still want to batch them at a finer-grain than the task.

LouisJenkinsCS commented 6 years ago

I'd like to have a crunchy example of such a problem to dig into rather than speculating as to what it might look like and engineering based on that assumption.

Unfortunately, I cannot actually supply you with one. The suggestion of @mppf would work just fine, so long as it was usable outside of a forall. I'm assuming it will do as I expect and handle any and all communication behind the scenes as efficiently as possible, and so long as the user can make use of such an aggregate reduction on their own (which I realize now that they can by manually invoking accumulate or whichever the equivalent method is), it should be the solution that is needed.

Specifically, if hyperedges is distributed as in your declaration, the forall loop would automatically create remote tasks on the remote locales owning chunks of that locale's local portion of the array, so the begin + on seems unneccessarily heavyweight

You're right as well, it would be much more efficient do it that way, and the example is insufficient. I really can't come up with anything at all at the moment.

mppf commented 6 years ago

The suggestion of @mppf would work just fine, so long as it was usable outside of a forall.

I feel like I'm missing something. What would a program wanting to do aggregation outside of a forall even look like? How would it be creating parallelism? (Edit: Did you mean usable for patterns in functions, not just those syntactically enclosed in a forall block?)

LouisJenkinsCS commented 6 years ago

Got aggregation mixed up with begin on statement, never mind. It would be fine just being a forall

mppf commented 6 years ago

I had a few ideas about this discussion that I'd like to bring to the table:

begin on in this discussion

begin on in this discussion really means may-begin on or non-blocking on. The key here is that we're looking for a way to do a "fire-and-forget" on statement. It's likely that we don't necessarily want the receiving locale to actually create a task for each one. begin on should be considered a placeholder for better syntax. In hindsight, I suspect the discussion would have gone more smoothly if we used some other syntax from the start (say, non-blocking on) and then thought through if it had the same semantic requirements as begin on. The two are certainly related, but they aren't quite the same idea.

custom reduction values use reduce intent by default?

With the custom reduction idea, I wish we didn't have to specify with ( ... reduce ...) when providing a type that is a reduction. That seems like a case for default intents to figure out that reductions used in a forall should be reduced by default.

forall semantics

What if we rely on forall's semantics that iterations can be completed in any order (in an as many tasks as we like)?

What am I talking about? As it is, the meaning of a forall is conceptually coming from two things:

  1. An assertion that the iterations can be completed in any order. We already imagine forall loops to have this property in a way that enables vectorization (e.g. see #7761) but it's also important for programs that use forall loops where the iterator driving the loop might change (e.g. because an array might have a different distribution in a different program configuration).
  2. A parallel loop driven by an iterator (say the leader-follower or standalone iterator for an array as an example).

I think there is a temptation, once we know how forall is implemented, to think of forall as only the 2nd concept. But perhaps this is a case where we need to rely on the 1st meaning of forall.

This interpretation allows us to imagine different compilation of the combination of forall and on. Of the benchmarks we've brought up here so far, I think RA is the most reasonable to start with in these discussions, since it has many implementations & performance is relatively understood. So I'll talk through what this would mean for RA:

  forall (_, r) in zip(Updates, RAStream()) do
    on TableDist.idxToLocale[r & indexMask] do {
      const myR = r;
      local {
        T[myR & indexMask] ^= myR;
      }
    }

Here the compiler can see a forall immediately containing an on, and handle the combination specifically, in the same way that begin on is handed specially by the compiler.

What could the compiler then do with the loop? I'm not yet sure how it would transform it, but the idea is that the combination forall on would imply that the remote operations could be completed in any order. Therefore the compiler and runtime could arrange to wait for some/all of the on statements at the end of the loop (instead of after each on statement). Or, going further, it could arrange to aggregate the forall on requests - e.g. by transforming the code into something like the custom reduction version of it that I showed above. (Or any other lower-level form of the loop).

Put another way, maybe we can do aggregation for forall on by default. forall on is a sort of bulk-parallel communication operation and aggregation is good for bulk-parallel communication. The case aggregation hurts with is things that are very latency sensitive, but the hypothesis here is that those would be naturally expressed with begin or coforall rather than forall.

Thoughts about this direction?

LouisJenkinsCS commented 6 years ago

Put another way, maybe we can do aggregation for forall on by default. forall on is a sort of bulk-parallel communication operation and aggregation is good for bulk-parallel communication. The case aggregation hurts with is things that are very latency sensitive, but the hypothesis here is that those would be naturally expressed with begin or coforall rather than forall.

I do like the approach of optimizing a forall on statement, but I think it should go beyond the on statement being the only thing in the loop. I think it should also be built around dependencies determined at compile-time... Currently, I have something similar to this in my code...

// Spawn a remote task on each node...
coforall loc in Locales do on loc {
   forall 1 .. perLocaleOperations {
      const vertex = someRandomVertex();
      const edge = someRandomEdge();
      if someCondition then graph.addInclusion(vertex, edge);
   }
}

Inside of addInclusion it will do the following...

inline proc addInclusion(vertex, edge) {
   on vertices[vertex] do …;
   on edges[edge] do …;
}

Now that addInclusion is inlined, I assume the loop becomes something like...

// Spawn a remote task on each node...
coforall loc in Locales do on loc {
   forall 1 .. perLocaleOperations {
      const vertex = someRandomVertex();
      const edge = someRandomEdge();
      if randomlySelected { 
         on graph.vertices[vertex] do …;
         on graph.edges[edge] do …;
      }
   }
}

In this case, would the compiler be able to reason that this can be performed in a "fire-and-forget" manner? In the RA example, the destination of the on statement is obtained directly from the iterator's yielded element, and its the only statement in the loop... but what about doing it for the last statements in the forall loop, like in this case? At that point of the loop, there can be no interaction between the original task that spawned the remote tasks.

mppf commented 6 years ago

but what about doing it for the last statements in the forall loop, like in this case? At that point of the loop, there can be no interaction between the original task that spawned the remote tasks.

Well, I think that could always work for a forall that ends with an on, but the thing I don't have an answer to is how would the compiler know if/when it could do the same thing for a forall loop containing two on statements (which is what you have). One on statement might modify something that the other on statement relies upon. (This problem would not be there if they were begin on, but as we've discussed that's probably not quite what we want). Not sure how to communicate to the compiler a desire for this to happen either without having some Aggregator idea. (Even so, I'd be pretty happy if foralls with 1 on statement could be optimized in this way.)

LouisJenkinsCS commented 6 years ago

What about...

// Spawn a remote task on each node...
coforall loc in Locales do on loc {
   forall 1 .. perLocaleOperations {
      const vertex = someRandomVertex();
      const edge = someRandomEdge();
      if randomlySelected { 
         cobegin {
            on graph.vertices[vertex] do …;
            on graph.edges[edge] do …;
         }
      }
   }
}

Or some statement similar to it, maybe a new async { … } block? In the case above, we just want to "fork-join" two tasks, at the end of a forall loop, which can make use of the same optimization that the last statement being an on would.

LouisJenkinsCS commented 6 years ago

@mppf

I have a good example where aggregation is sorely needed. I decoupled one of my algorithms from the abstraction currently being built to see if the issue had to do with privatization. Its an example that could possibly be experimented with to see how to improve overall performance. The example is small enough to fit within a single file and within ~50 lines of code. Performance is really bad, even for a simple test like this.

use BlockDist;
use Time;
use Random;

record Edge {
   var vertices : list(int);
   var lock : atomic bool;

   proc addVertex(v) {
     while lock.testAndSet() do chpl_task_yield();
     vertices.push_back(v);
     lock.clear();
   }
}

record Vertex {
   var edges : list(int);
   var lock : atomic bool;

   proc addEdge(e) {
     while lock.testAndSet() do chpl_task_yield();
     edges.push_back(e);
     lock.clear();
   }
}

config const numVertices = 1024 * 1024;
config const numEdges = 2 * numVertices;
config const probability = 0.001;

const vertexSpace = {0..#numVertices};
const edgeSpace = {0..#numEdges};
const vertexDom = vertexSpace dmapped Block(boundingBox=vertexSpace, targetLocales = Locales[0..numLocales by 2]);
const edgeDom = edgeSpace dmapped Block(boundingBox = edgeSpace, targetLocales = Locales[1..numLocales by 2]);

var vertex : [vertexDom] Vertex;
var edge : [edgeDom] Edge;
var inclusionsToAdd = (numVertices * numEdges * probability) : int;

var timer = new Timer();
timer.start();
coforall loc in Locales do on loc {
  var perLocaleInclusions = (inclusionsToAdd / numLocales) + (if here.id == 0 then (inclusionsToAdd % numLocales) else 0);
  coforall tid in  0..#here.maxTaskPar {
    var perTaskInclusions = perLocaleInclusions / here.maxTaskPar + (if tid == 0 then (perLocaleInclusions % here.maxTaskPar) else 0);
    var randStream = new RandomStream(int(64), parSafe = false, seed = here.id * here.maxTaskPar + tid);
    for 1..perTaskInclusions {
      var v = randStream.getNext(0, numVertices - 1);
      var e = randStream.getNext(0, numEdges - 1);
      on vertex[v] do vertex[v].addEdge(e);
      on edge[e] do edge[e].addVertex(v);
    }
  }
}
timer.stop();
writeln(timer.elapsed());

Edit: To reduce amount of work needing to get done (and to complete with wallclock time of 5 minutes), I reduce probability to 0.001, which means the amount of iterations per task is going to be for 32 nodes and 44 cores: 1e6 2e6 1e-3 = 2e9 / 32 / 44 = 1.4e6 = 1.4 million units of work per task. This is a fast version of the Erdos Renyi generation of a relatively small hypergraph, and it takes 240 seconds to complete.

Edit2: Again, remember this is without the hypergraph abstraction, this is using Chapel's domains and arrays.

Edit3: Interesting update: making the on statements above begin on will result in degrading performance. As Locale 0 will begin its work first, it will spawn a ton of remote tasks that fill up the task queues of the other 31 locales. I'm assuming the issue is a lot of fine-grained execute_on statements, which aggregating them will help significantly. However another issue definitely is going to be how the tasking layer will be able to handle it all.

LouisJenkinsCS commented 6 years ago

Would like to note that as we increase the size of vertices and edges, the potential for any contended access between threads become nil. There wouldn't be much of any benefit by coalescing insertions into each individual vertex and edge, just from performing the operations in bulk locally. Hence the idea of spawning multiple tasks in batch here would be a significant and short-term solution that would significantly speedup this application.

LouisJenkinsCS commented 6 years ago

At the suggestion of @benharsh, I did add parSafe = false as an argument to the random stream and performance did not improve. I can possibly add a quick easy example of a 'destination buffer' solution and show how that one will scale as a potential solution to the problem. Of course this 'solution' is one that Chapel should offer and would require a destination buffer per node (or even per dedicated task/thread) per operation (as the buffer needs to buffer something specific for each operation), which needless to say is very ugly and bulky. I was thinking of creating an abstraction, but due to pressing time constraints I think a more long-term solution is needed... However, again, for sake of experimentation and researching the problem I can construct and provide the alternative.

bradcray commented 6 years ago

it will spawn a ton of remote tasks that fill up the task queues of the other 31 locales. I'm assuming the issue is a lot of fine-grained execute_on statements, which aggregating them will help significantly. However another issue definitely is going to be how the tasking layer will be able to handle it all.

This observation is the kind of thing that made me (and @gbtitus) suggest that perhaps aggregating tasks is not the ideal solution, as it might just push the bottleneck elsewhere; maybe instead the computations themselves need to be aggregated with a single (or modest) number of tasks performing those updates. This is what I was trying to say in my previous comments—what would it take to mock up a user-level aggregation of updates to be performed (in an array, say), communicate those updates (via an array copy, say), and then perform them locally (using a forall loop to get appropriate parallelism).

LouisJenkinsCS commented 6 years ago

Call me hard-headed, but I still think there is another way that can abstract/lift this from the user. You're right that the bottleneck will be there somewhere, and that the most efficient solution is to hand-roll your own aggregation and coalescing, and as well that it is a great idea that Chapel aid the user in rolling their own... but I still want to try the less-ideal way for now (as it seems like a more fun challenge)

LouisJenkinsCS commented 6 years ago

New Issue on design of aggregation library here: #10386

LouisJenkinsCS commented 6 years ago

Marking issue as resolved. My aggregation library increases performance phenomenally... Performance improvements documented below...

Without Aggregation Library

1, 20.5788
2, 89.3139
4, 67.3591
8, 46.6481
16, 37.9097
32, 33.2092
64, 25.4192

With Aggregation Library

1, 17.1601
2, 57.5139
4, 29.0504
8, 15.4141
16, 8.17216
32, 3.80618
64, 2.07434

Significant improvements, would appreciate if it if library could be included as official package. Write up of the above program using the new library...

use BlockDist;
use Time;
use Random;

record Edge {
   var vertices : list(int);
   var lock : atomic bool;

   proc addVertex(v) {
     while lock.testAndSet() do chpl_task_yield();
     vertices.push_back(v);
     lock.clear();
   }
}

record Vertex {
   var edges : list(int);
   var lock : atomic bool;

   proc addEdge(e) {
     while lock.testAndSet() do chpl_task_yield();
     edges.push_back(e);
     lock.clear();
   }
}

enum InclusionType { VertexType, EdgeType }

config const numVertices = 1024 * 1024;
config const numEdges = 2 * numVertices;
config const probability = 0.001;

const vertexSpace = {0..#numVertices};
const edgeSpace = {0..#numEdges};
const vertexDom = vertexSpace dmapped Block(boundingBox=vertexSpace, targetLocales = Locales[0..numLocales by 2]);
const edgeDom = edgeSpace dmapped Block(boundingBox = edgeSpace, targetLocales = Locales[1..numLocales by 2]);

var vertex : [vertexDom] Vertex;
var edge : [edgeDom] Edge;
var inclusionsToAdd = (numVertices * numEdges * probability) : int;
var aggregator = new Aggregator((int, int, InclusionType));
proc handleBuffer(buf : Buffer((int, int, InclusionType)), loc : locale) {
  on loc {
    forall (src, dest, srcType) in buf {
      select srcType {
        when srcType == InclusionType.VertexType {
          vertex[src].addEdge(dest);
        } when srcType == InclusionType.EdgeType {
          edge[src].addVertex(vertex);
        }
      }
    }
    buf.finished();
  }
}

var timer = new Timer();
timer.start();
coforall loc in Locales with (in aggregator) do on loc {
  var perLocaleInclusions = (inclusionsToAdd / numLocales) + (if here.id == 0 then (inclusionsToAdd % numLocales) else 0);
  sync coforall tid in  0..#here.maxTaskPar {
    var perTaskInclusions = perLocaleInclusions / here.maxTaskPar + (if tid == 0 then (perLocaleInclusions % here.maxTaskPar) else 0);
    var randStream = new RandomStream(int(64), parSafe = false, seed = here.id * here.maxTaskPar + tid);
    for 1..perTaskInclusions {
      var v = randStream.getNext(0, numVertices - 1);
      var e = randStream.getNext(0, numEdges - 1);
      var vBuf = aggregator.aggregate((v, e, InclusionType.VertexType), vertex[v].locale);
      if vBuf != nil then begin handleBuffer(vBuf, vertex[v].locale);
      var eBuf = aggregator.aggregate((e, v, InclusionType.EdgeType), edge[e].locale);
      if eBuf != nil then begin handleBuffer(eBuf, edge[e].locale);
    }
  }
}
forall (buf, loc) in aggregator.flush() {
  handleBuffer(buf, loc);
}
timer.stop();
writeln(timer.elapsed());

Not hard to write, easy to reason about, no overly complicated and engineering contraptions... simplicity.