chapel-lang / chapel

a Productive Parallel Programming Language
https://chapel-lang.org
Other
1.79k stars 421 forks source link

Distributed Word Count using Aggregation Library #10578

Open LouisJenkinsCS opened 6 years ago

LouisJenkinsCS commented 6 years ago

As per request of @mppf in https://github.com/chapel-lang/chapel/issues/9848#issuecomment-408908557

I decided to think up a way to use the Aggregator (the aggregation library proposed in #10386) to solve this particular problem...

Lets start by listing the naïve version...

// D is a distributed associative domain (parSafe=true)
// A is a distributed associative array over domain D
// words is a (differently) distributed array of elements to add
forall word in words {
  // note: expecting this to involve locking
  // note: desirable to combine these two operations into 1 update
  D += word;
  A[word].add(1);
  // perhaps could be written as A[word].add(1) ?
}

Which would require synchronization (in a distributed context at that) each time to check to see if word is in the domain D and add it to it if it isn't... which would presumably lock down A as well since resizing D would resize A. Not sure how you expected that to work, but continuing on to the 'aggregated and coalesced' version using Aggregator...

// L is a lock (test-and-test-and-set spinlock or 'sync' variable)
// D is a distributed associative domain (parSafe=false)
// A is a distributed associative array over domain D (not atomic)
// words is a (differently) distributed array of elements to add

// Update aggregator is used to aggregate updates to distributed word count
// Always from Locale 0 to Locales 1..N
var updateAggregator = new Aggregator((words.eltType, int));
proc handleUpdateBuffer(buf : Buffer((words.eltType, int)), loc : locale) {
   on loc do [(word, count) in buf] A[word].add(count);
   buf.done();
}
// Word aggregator is used to aggregate words to Locale 0 (push-based prefetch?)
// Always from Locales 1..N to Locale 0
var wordAggregator = new Aggregator(words.eltType); 
proc handleWordBuffer(buf : Buffer(words.eltType)) {
   // Perform coalescing... May or may not be appropriate based on how often duplicates are expected
   // but since all locales jump to Locale 0 to update 'A' and 'D', it would be best to minimize computation
   // by reducing the work here
   var _D : domain(words.eltType);
   var _A : [_D] int;
   for word in buf {
      _D += word;
      _A[_D] += 1;
   }
   buf.done();

   on Locales[0] {
      const _localD = _D;
      const _localA = _A;
      L.acquire();
      D += _localD;
      // Wait for spawned asynchronous update buffer handlers if we have any
      sync forall d in _localD {
         const loc = A[d].locale;
         if loc == here then A[d].add(_localA[d]);
         else {
            var updateBuf = updateAggregator.aggregate((d, _localA[d]), loc);
            if updateBuf != nil then begin handleUpdateBuffer(updateBuf, loc);
         }
      }
      L.release();
   }
}
sync forall word in words {
  // Explicitly aggregate to self as we don't want to acquire lock each time too...
  var wordBuf = wordAggregator.aggregate(word, Locales[0]);
  if wordBuf != nil then begin handleWordBuffer(wordBuf);
}
// Flush leftover words
forall (buf, _) in wordAggregator.flush() {
   handleWordBuffer(buf);
}
// Flush leftover updates
forall (buf, loc) in updateAggregator.flush() {
   handleUpdateBuffer(buf, loc)
}

In this case, we have all locales performing a 'push-based' approach for sending work to Locale 0 via wordAggregator, instead of the pull-based approach inherent in prefetching. I believe that we must coordinate all work with Locale 0 due to the issue of synchronization. As well, due to work being 'pushed' rather than 'pulled', we can send coalesced data, such that we combine duplicate word counts we have aggregated prior to sending it.

Finally for pushing out updates, we aggregate to another aggregator that aggregates the word and the word count we have for that locale; unless the buffer is filled out, aggregating data to update it is extremely fast and local to locale 0. If the buffer is filled out, we just dispatch the buffer as expected on the target locale.

@mppf Any thoughts? This approach is very long and likely not what you are hoping for, but its one that feels efficient enough for me. Really, if you had the aggregationHandler for this kind of things (I.E not function objects but lambdas) we could have it much shorter...

proc handleUpdateBuffer(buf : Buffer((words.eltType, int)), loc : locale) {
   on loc do [(word, count) in buf] A[word].add(count);
   buf.done();
}
var updateAggregator = new Aggregator((words.eltType, int), handleUpdateBuffer); 
proc handleWordBuffer(buf : Buffer(words.eltType), loc : locale) {
   var _D : domain(words.eltType);
   var _A : [_D] int;
   for word in buf {
      _D += word;
      _A[_D] += 1;
   }
   buf.done();

   on Locales[0] {
      const _localD = _D;
      const _localA = _A;
      L.acquire();
      D += _localD;
      sync forall d in _localD {
         const loc = A[d].locale;
         if loc == here then A[d].add(_localA[d]);
         else updateAggregator.aggregate((d, _localA[d]), loc);
      }
      L.release();
   }
}
var wordAggregator = new Aggregator(words.eltType, handleWordBuffer);
sync forall word in words {
  wordAggregator.aggregate(word, Locales[0]);
}
wordAggregator.flush();
updateAggregator.flush();

But that's a story for another day.

LouisJenkinsCS commented 6 years ago

Now that I think about, you could probably acquire the lock globally, but the issue would then come down to "how slow is updating the domain and lock globally compared to doing it on Locale 0".

mppf commented 6 years ago

So, the distributed associative doesn't actually have a global lock (other than the one you introduce here). Instead, it has a lock per locale that only manages the array/domain on that locale.

I'd expect that it's important to consider taking/releasing this lock along with aggregating the updates to a given locale.

LouisJenkinsCS commented 6 years ago

Question: How do you know which locale to add the new word to? In fact, I'm interested in knowing how the distributed associative array is implemented. How do you search for elements in the distributed associative array, and does it handle cases where the word is located in some other locale's array/domain? Do insertions from a single locale ever get distributed or do they sit on that single locale?

mppf commented 6 years ago

Distributed associative uses a function, possibly provided by the user, that computes the target locale for each key. It might be based on hashing (after all non-distributed associative is currently a hashtable too).

So it's known for each key what locale would store it, whether or not it's in the table.

LouisJenkinsCS commented 6 years ago

Can I see the current implementation? Is it currently in the repository? I have a distributed hash table idea based on my previous work that I could use to help extend this, but it would be better if there was an official implementation to base this on/compare performance results to.

mppf commented 6 years ago

@LouisJenkinsCS - the current implementation is here: https://github.com/chapel-lang/chapel/blob/master/test/distributions/bradc/assoc/UserMapAssoc.chpl

It's been tested for a while but needs some API review to make it to modules/dists.