NICTA / scoobi

A Scala productivity framework for Hadoop.
http://nicta.github.com/scoobi/
482 stars 97 forks source link

Filtering with side-effect causes issues with job construction. #282

Closed markhibberd closed 11 years ago

markhibberd commented 11 years ago

Added test case on branch: https://github.com/markhibberd/scoobi/compare/filter-side-effect

With 7.0.1 a filter that contains a side effect can be run multiple times. See test for more details, but the following formulation can demonstrate this:

val dlist = (.....).filter(<some side effect>)
(dlist.size join dlist.map(...))

According to @blever, the expected behaviour is that filter should run and calculate size in one map job, and then join/map should run in second job based on results of first.

On the surface the (potential) optimisation that is run would appear valid for a pure function i.e. size and map could be run in parallel with a fused filter operation. It is possibly worth considering exposing this distinction in the future to enable such optimisations.

tonymorris commented 11 years ago

A filter that does not contain a side-effect (likely) runs multiple times too. The side-effect just happens to expose the multiple runs.

Is the fix to make it only run once, so that if there is a side-effect, it occurs as expected, and if there isn't, it is optimised?

Bit confused, sorry.

markhibberd commented 11 years ago

@tonymorris It may be quicker to run the filter multiple times in parallel in some cases, in particular for the case where shifting data is more expensive than the computation. Agreed it is not clear cut in the naive example above, but it is definitely possible (although maybe too data-dependent to be of general use).

tonymorris commented 11 years ago

Ah got it, thanks.

blever commented 11 years ago

In the example given, the mapper in the first job should do the filter (once) and pass that result on to the map and the first part of size (which I think is a parallelDo). This results in two output streams. The first (from the map) will go through identity reducers and written out. The second (from part of size) will complete the size implementation which will perform a reduction on a single reducer, to produce the size, and write it out also. The second job will read both inputs in (output of size and map) and in its mapper perform the join, another parallelDo.

etorreborre commented 11 years ago

This issue is fixed with the latest developments. All the unit tests and acceptance tests are passing from this commit 2215f32.

The fix was done by rewriting the MscrsDefinition algorithm and making sure, via a ScalaCheck property, that no processing node ends up being used in more than one input or output channel.