mratsim / weave

A state-of-the-art multithreading runtime: message-passing based, fast, scalable, ultra-low overhead
Other
535 stars 22 forks source link

Implement reductions #33

Closed mratsim closed 4 years ago

mratsim commented 4 years ago

The reduction system should accomodate complex reductions, for example parallel variance, logsumexp or parallel softmax cross-entropy.

Arraymancer examples

In Arraymancer this is how parallel variance is done (https://github.com/mratsim/Arraymancer/blob/9a56648850fc34fdd9da1f9c6874a87ddecc8932/src/tensor/aggregate.nim#L100-L130):

On a whole tensor

proc variance*[T: SomeFloat](t: Tensor[T]): T =
  ## Compute the sample variance of all elements
  ## The normalization is by (n-1), also known as Bessel's correction,
  ## which partially correct the bias of estimating a population variance from a sample of this population.
  let mean = t.mean()
  result = t.fold_inline() do:
    # Initialize to the first element
    x = square(y - mean)
  do:
    # Fold in parallel by summing remaning elements
    x += square(y - mean)
  do:
    # Merge parallel folds
    x += y
  result /= (t.size-1).T

On an axis

proc variance*[T: SomeFloat](t: Tensor[T], axis: int): Tensor[T] {.noInit.} =
  ## Compute the variance of all elements
  ## The normalization is by the (n-1), like in the formal definition
  let mean = t.mean(axis)
  result = t.fold_axis_inline(Tensor[T], axis) do:
    # Initialize to the first element
    x = square(y - mean)
  do:
    # Fold in parallel by summing remaning elements
    for ex, ey, em in mzip(x,y,mean):
      ex += square(ey - em)
  do:
    # Merge parallel folds
    x += y
  result /= (t.shape[axis]-1).T

With a 2-pass logsumexp

proc logsumexp_classic[T: SomeFloat](t: Tensor[T]): T =
  # Advantage:
  #  - OpenMP parallel
  #  - No branching in a tight loop
  # Disadvantage:
  #  - Two loops over the data, might be costly if tensor is big
  let alpha = t.max # first loop over data

  result = t.fold_inline() do:
    # Init first element
    x = exp(y - alpha)
  do:
    # Process next elements
    x += exp(y - alpha)
  do:
    # Merge the partial folds
    x += y

  result = alpha + ln(result)

Similarly for softmax cross-entropy implemented via parallel one-pass logsumexp

proc frobenius_inner_prod*[T](a,b: Tensor[T]): T =
  sum:
    map2_inline(a,b):
      x * y

proc streaming_max_sumexp*[T](t: Tensor[T]): tuple[max:T, sumexp: T] {.noSideEffect, inline.}=
  # Advantage:
  #  - Only one loop over the data
  #  - Can be done "on-the-fly"
  # Disadvantage:
  #  - no parallelization
  #  - branching in tight loop
  result.max = -Inf.T   # will store the streaming max of the tensor 
  result.sumexp = 0.T   # will store the streaming sum of exp of the tensor

  for x in t:
    if x <= result.max:
      result.sumexp += exp(x - result.max)
    else:
      result.sumexp = result.sumexp * exp(result.max - x) + 1
      result.max = x

proc logsumexp*[T: SomeFloat](t: Tensor[T]): T =
  # Alternative one-pass log sumexp
  let (max, sumexp) = t.streaming_max_sumexp
  result = max + ln(sumexp)

proc softmax_cross_entropy*[T](input, target: Tensor[T]): T =
  ## Softmax function + Cross-Entropy loss fused in one layer.
  ##
  ## Input:
  ##   - A Tensor of shape [batch_size, predicted_labels_probabilities]
  ##   - The target values of shape [batchsize, truth_labels_probability]
  ## Returns:
  ##   - Apply a softmax activation and returns the cross-entropy loss.
  when compileOption("boundChecks"):
    check_input_target(input, target)

  let batch_size = input.shape[0]
  result = frobenius_inner_prod(input, target)

  let sum_logsumexp = fold_axis_inline(input, T, fold_axis=0) do:
    x = y.logsumexp
  do:
    x += y.logsumexp
  do:
    x += y

  result = (sum_logsumexp - result) / T(batch_size)

Laser example

This is a thin wrapper over OpenMP and relies on OpenMP static scheduling.

https://github.com/numforge/laser/blob/d1e6ae6106564bfb350d4e566261df97dbb578b3/examples/ex05_tensor_parallel_reduction.nim#L47-L59

proc reduction_localsum_system_atomic[T](x, y: Tensor[T]): T =
  forEachStaged xi in x, yi in y:
    openmp_config:
      use_simd: false
      nowait: true
    iteration_kind:
      contiguous
    before_loop:
      var local_sum = 0.T
    in_loop:
      local_sum += xi + yi
    after_loop:
      result.atomicInc local_sum

Proposed syntax

Unfortunately Arraymancer syntax is a bit too magic with x and y appearing out of nowhere.

And it's hard to see when the element is a float32 or a Tensor[float32] for example. Instead a lightweight DSL could be written with something resembling that:

For a parallel sum

var finalResult: int

parallelFor i in 0 ..< 100:
  reduce(finalResult):
    neutralAccum(sum: int): 0
    foldOperation(accum: var int, nextElem: int): accum += nextElem
    mergeAccum(sum1, sum2: int): sum1+sum2

For a parallel variance

let mean = mean(mySeq)
var finalResult: int

parallelFor i in 0 ..< mySeq.len:
  captures: {mean}
  reduce(finalResult):
    neutralAccum(accum: int): 0
    foldOperation(accum: var int, x: int): accum += (x-mean)^2
    mergeAccum(acc1, acc2: int): acc1 + acc2

And the code transformation will generate a lazy tree of tasks that ends with

...
...
finalResult = sync(task0) + sync(task1)
mratsim commented 4 years ago

Closed by #34

Final syntax (high-level exported wrapper TBD):

proc sumReduce(n: int): int =
  var waitableSum: Flowvar[int]

  parallelReduceImpl i in 0 .. n, stride = 1:
    reduce(waitableSum):
      prologue:
        var localSum = 0
      fold:
        localSum += i
      merge(remoteSum):
        localSum += sync(remoteSum)
      return localSum

  result = sync(waitableSum)

init(Weave)
let sum1M = sumReduce(1000000)
echo "Sum reduce(0..1000000): ", sum1M
doAssert sum1M == 500_000_500_000
exit(Weave)