twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Refactor the cascading code in the typed API into a self contained backend #1666

Closed johnynek closed 7 years ago

johnynek commented 7 years ago

As expected, this was actually easier than we might have suspected for a while.

This is not ready to ship. We will need to test it a fair amount. Also we need to probably add optimizations (the graph optimizer from summingbird should be great here).

But the code is much cleaner.

It is not totally factored, as I didn't move all the Grouped and CoGrouped code into the CascadingBackend, but that is easy, I just wanted to keep the diff size down a bit.

With this code, we could almost plan to any backend (when we are well factored) and just error if someone starts with a raw cascading pipe. The backend would need a map of TypedSource[T] to the platform's source type. We will also need to update Execution[_] to deal with pluggable backends, but that should be in a separate PR in my view.

cc @piyushnarang @ianoc @isnotinvain @fwbrasil @dieu

todo:

johnynek commented 7 years ago

Also, this still blends some optimizations in the types (join composition is still handled by CoGrouped). Ideally we want to separate and do no optimizations as we build the AST, and then when we plan apply the general rules (those that apply to all graphs, e.g. (a ++ b).map(f) == (a.map(f) ++ b.map(f)) vs those that only apply to Cascading (a GroupBy node can accept any number of input pipes).

johnynek commented 7 years ago

This still does not pass all the tests.

The two issues are:

  1. line numbers in many cases were lost since they were handled as a side effect of how TypedPipeFactory worked.
  2. optimizations around zip of Execution seems a bit fragile, so we seem to have blocked some of them here. I think it has to do with some of the pipes looking different, and I am not sure the test is exactly a real failure in that I fear there may have been other fragile cases in the past. We should fix the issue and maybe make it more robust.

In a follow up PR, we should move more of the Execution related code into CascadingBackend.

I'm fairly optimistic this approach could allow us to: a. both make the planning more clear and understandable. b. enable new, whole-graph optimizations which we can't do well now. c. allow us to run on other platforms (spark, flink, concurrent local) with minimal changes (error if the user converts directly from a cascading pipe but plans to another platform, so we can keep source compatibility with the existing API).

johnynek commented 7 years ago

another optimization that occurs to me is that we could now pretty easily avoid lifting every map function to A => TraversableOnce[B]. Instead if we have a A => B we could avoid the allocation of Iterator.single. Since we do that on each row, that could actually be a measurable improvement for such jobs.

piyushnarang commented 7 years ago

This is pretty cool, thanks for putting this up. Will take a look in the next day / two.

johnynek commented 7 years ago

okay, other than benchmarking the new function composition, I think this addresses most comments.

I think after the benchmark, this should be mergeable. Can you look again @fwbrasil and @piyushnarang ?

johnynek commented 7 years ago

turns out of benchmarks don't compile, which is a bummer. Rather than shave that yak right now, I'm just making the function composition use andThen to compose except be more efficient than before (avoid allocating a Iterator except at the very end for map sequences, be smarter about filter as well).

I think this is mergeable. What do you folks say?

fwbrasil commented 7 years ago

LGTM! The build is failing, though.

piyushnarang commented 7 years ago

👍 This looks good to me as well.