mbraceproject / MBrace.Core

MBrace Core Libraries & Runtime Foundations
http://mbrace.io/
Apache License 2.0
209 stars 46 forks source link

Predicate across averageByKey, percentageByKey etc. #149

Closed isaacabraham closed 8 years ago

isaacabraham commented 8 years ago

Something I've noticed from the House Price example in the starter kit is that it would be useful to be able to discard groups based on an arbitrary predicate. For example, look at the following sample: -

let averagePricesTask =
    persistedHousePrices
    |> CloudFlow.averageByKey
          (fun row -> (row.TownCity, row.Street))
          (fun row -> float row.Price)
    |> CloudFlow.cache
    |> cluster.CreateProcess

The problem here is that it brings in all groupings, even ones of 1 item. If you want more control, you need to resort to falling back to the less efficient groupBy: -

let averagePricesTask =
    persistedHousePrices
    |> CloudFlow.groupBy(fun row -> row.TownCity, row.Street)
    |> CloudFlow.filter(fun (_, items) -> items |> Seq.length > 5) // only bring back streets with at least 5 houses in them.
    |> CloudFlow.map(fun (key, items) -> key, items |> Seq.averageBy(fun row -> float row.Price))
    |> CloudFlow.cache
    |> cluster.CreateProcess

It would be good to have a generalised filter for each grouping that would be run to limit what sort of groupings are returned: -

let averagePricesTask =
    persistedHousePrices
    |> CloudFlow.averageByKey
          (fun row -> (row.TownCity, row.Street))
          (fun row -> float row.Price)
          (fun group -> group |> Seq.length > 5) // grouping filter predicate
    |> CloudFlow.cache
    |> cluster.CreateProcess

I have no idea if this is possible whilst retaining the efficiency of averageByKey rather than groupBy though, so happy to hear your thoughts on this @palladin @dsyme.

palladin commented 8 years ago

In general combinators that help to reduce IO is a good design goal, but for this particular case can not work because you need to aggregate all the partial results in order to apply the filter predicate.

isaacabraham commented 8 years ago

Makes sense. But there must be, somewhere, some aggregation of the partial results being done in order to calculate the average?

palladin commented 8 years ago

Yes, you can apply a filter predicate to partial aggregated results but semantically it's not correct to filter out results prematurely.

isaacabraham commented 8 years ago

Where does the "final" aggregation occur? I suppose each partition needs to know: -

  1. The average
  2. The number of elements in that partial set

Would that be correct?

palladin commented 8 years ago

We could have a version of foldBy like foldByWithFilter but the problem is the semantic meaning of foldByWithFilter.

isaacabraham commented 8 years ago

Alternatively perhaps the output can be a tuple containing both the average, and the number of elements in the set?

palladin commented 8 years ago

It is very easy to implement such kind of combinators with foldBy. Btw, Do you know if Spark has something equivalent ?

isaacabraham commented 8 years ago

No idea. I just thought that it would be useful as it's something that I wanted to do :-) Maybe just leaving it to foldBy is the best way then - can you show me an example of how I've achieve what I'm after?

palladin commented 8 years ago

It is very easy check this https://github.com/mbraceproject/MBrace.Core/blob/master/src/MBrace.Flow/Combinators.fs#L728 and you just |> map (fun (key, (count,sum)) -> (key, LanguagePrimitives.DivideByInt sum count, count))

isaacabraham commented 8 years ago

Ah that's super easy. Closing this then :-)