chapel-lang / chapel

a Productive Parallel Programming Language
https://chapel-lang.org
Other
1.79k stars 420 forks source link

Patterns Requiring Aggregation #9848

Open mppf opened 6 years ago

mppf commented 6 years ago

This issue is meant to include several relatively simple Chapel examples for which aggregation is important to achieve good performance. The idea is that an aggregation strategy (or group of strategies) should be able to handle these patterns. In-depth discussion of individual patterns or approaches to achieve the aggregation should happen somewhere else.

Group 1: Updating

Atomic Histogram

// A is a distributed array
// rindex is a (differently) distributed array
forall r in rindex {
  A[r].add(1, order=memory_order_relaxed));
}

See also #9782 and https://github.com/jdevinney/bale/blob/master/apps/histo_src/histo_agi.upc This is very similar to some versions of RA.

See also https://github.com/chapel-lang/chapel/blob/master/test/studies/bale/histogram/histo-atomics.chpl

Histogram using single on statement

// A is a distributed array
// rindex is a (differently) distributed array
forall r in rindex {
  // note: expecting to use processor atomics and an `on` statement in the below call
  A[r].add(1);
}

The aggregation pattern of this benchmark is very similar also to RA -- see https://github.com/chapel-lang/chapel/blob/6e8996b7c9c297c80655fd9fbb884da56912d239/test/release/examples/benchmarks/hpcc/ra.chpl#L149-L155

Distributed set construction

// D is a distributed associative domain (parSafe=true)
// indexes is a (differently) distributed array of elements to add
forall idx in indexes {
  // note: expecting this to invoke an `on` statement and do locking there
  D += idx;
}

See also test/studies/labelprop/labelprop-tweets.chpl.

Distributed word count / map-reduce

// D is a distributed associative domain (parSafe=true)
// A is a distributed associative array over domain D
// words is a (differently) distributed array of elements to add
forall word in words {
  // note: expecting this to involve locking
  // note: desirable to combine these two operations into 1 update
  D += word;
  A[word].add(1);
  // perhaps could be written as A[word].add(1) ?
}

Updating two distributed arrays at once

When constructing a graph with different distributed arrays storing vertex information and edge information, a case similar to histogram comes up but the body of the loop would contain two on statements in the naive version.

// supposing Elements is a distributed array
// suppose Vertices is a distributed array
// suppose Edges is a (differently) distributed array
forall (vertexInfo, edgeInfo) in Elements {
  // expecting other statements here in the actual application
  on Vertices[vertexInfo.index] do addVertex(Vertices, vertexInfo);
  on Edges[edgeInfo.edge] do addEdge(Edges, edgeInfo);
}

See also #9727 and in particular https://github.com/chapel-lang/chapel/issues/9727#issuecomment-397700766

Group 2: Gathering

Gathering from a Distributed Array

// GlobalArray is a distributed array
// LocalArray is a non-distributed array
// Idxs is a local array containing indices of GlobalArray to gather
forall (idx,j) in zip(Idxs,1..) {
  LocalArray[j] = GlobalArray[idx];
}

See also https://github.com/jdevinney/bale/blob/master/apps/ig_src/ig_agi.upc

Group 3

Topo Sort

See https://github.com/jdevinney/bale/blob/master/apps/topo_src/toposort_agi.upc and #9814. TODO: put a sketch of it here.

See https://github.com/chapel-lang/chapel/tree/master/test/studies/bale/toposort

LouisJenkinsCS commented 6 years ago

Question: Isn't data aggregation and coalescing the only thing that MPI languages and libraries have over Chapel? If so, once Chapel has native and polished support for this, won't Chapel basically be the ultimate one-true language in HPC since it would be able to finally compete on all other benchmarks?

bradcray commented 6 years ago

Not to suggest that a good aggregation story is undesirable, but doesn't MPI require the user to do the data aggregation and coalescing, and it just takes care of the communication?

(This is essentially what I was suggesting over on issue #9727... for a given problem benefitting from aggregation, how large a gain can be obtained by source-level aggregation and coalescing, and then what could/should the language or compiler do to help realize those same gains with less effort by the programmer?)

LouisJenkinsCS commented 6 years ago

It does? Woops, my fault, I guess I got too far ahead of myself, I've never actually used MPI, I just assumed it offered that due to how much performance you get, but I guess it makes sense that it requires the user to hand-write their own.

LouisJenkinsCS commented 6 years ago

@mppf Are there complete examples of these programs that I can test my aggregation library on?

mppf commented 6 years ago

@LouisJenkinsCS - I updated the issue description with links to implementations in the test system where I know they exist.

LouisJenkinsCS commented 6 years ago

Btw, I decided to think up of a way to use the Aggregator (the aggregation library proposed in #10386) to solve these problems...

Lets start by listing the naive...

// D is a distributed associative domain (parSafe=true)
// A is a distributed associative array over domain D
// words is a (differently) distributed array of elements to add
forall word in words {
  // note: expecting this to involve locking
  // note: desirable to combine these two operations into 1 update
  D += word;
  A[word].add(1);
  // perhaps could be written as A[word].add(1) ?
}

Which would require synchronization (in a distributed context at that) each time to check to see if word is in the domain D and add it to it if it isn't... which would presumably lock down A as well since resizing D would resize A. Not sure how you expected that to work, but continuing on to the 'aggregated and coalesced' version using Aggregator...

// L is a lock (test-and-test-and-set spinlock or 'sync' variable)
// D is a distributed associative domain (parSafe=false)
// A is a distributed associative array over domain D (not atomic)
// words is a (differently) distributed array of elements to add
// Buffer handler, always sent to locale 0 to coordinate updating global array
proc handleBuffer(buf : Buffer(words.eltType)) {
   // Perform coalescing... May or may not be appropriate based on how often duplicates are expected
   // but since all locales jump to Locale 0 to update 'A' and 'D', it would be best to minimize computation
   // by reducing the work here
   var _D : domain(words.eltType);
   var _A : [_D] int;
   for word in buf {
      _D += word;
      _A[_D] += 1;
   }
   buf.done();

   // Dispatch on Locale 0
   on Locales[0] {
      L.acquire();
      D += _D; // Can this work? Appending one domain to another?
      forall d in _D do A[d] += _A[d];
      L.release();
   }
}
var aggregator = new Aggreagtor(words.eltType);
sync forall word in words {
  // Explicitly aggregate to self as we don't want to acquire lock each time too...
  var buf = aggregator.aggregate(word, Locales[0]);
  if buf != nil then begin handleBuffer(buf);
}
// Flush leftovers (note: destination is always locale 0)
forall (buf, _) in aggregator.flush() {
   handleBuffer(buf);
}

@mppf Any thoughts? The same can be done for the 'Distributed Set' thing too. We need to update the domain on Locale 0 to prevent race conditions, but perhaps we can also use the same Aggregator to aggregate the bulk update of the counter as well? Or maybe we can have two aggregators that aggregate two separate types of data; one for other locales to safely update the domain, and another for Locale 0 to send distributed values.

mppf commented 6 years ago

@LouisJenkinsCS - let's create an issue to discuss aggregating in Word Count specifically & move this discussion over there, thanks.