twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 703 forks source link

Issue with data being materialized after `cross` and before subsequent `map` #1905

Open ttim opened 5 years ago

ttim commented 5 years ago

In twitter we found a case when data got serialized after cross and before subsequent map. This might be critical since result of cross is usually really big and subsequent operations decrease size of it a lot. I created test for this: https://github.com/ttim/scalding/commit/85ddcff8067dc04418c975bb277ed4ccdc69a58c . Seems like it only reproducable when .toPipe is used and two branches happen after .cross.map.

Any ideas @johnynek ?

ttim commented 5 years ago

Removing .orElse(composeSame) from default map reduce optimization rules fixes the test.

ttim commented 5 years ago

I think I know what's wrong.

The test I wrote doing next:

 val crossedMapped = TypedPipe.from(source)
      .cross(mapPipe)
      .map { case (value, map) => map(value) }

  crossedMapped.toPipe('value).write(sink1)
  crossedMapped.map(identity).toPipe('value).write(sink2)

During optimization phase we optimize two maps going one after another in second fork and therefore only common part of this two graphs become cross join. Forcing this common part to disk (my guess) made by cascading itself.

I think with typed writes it works because during typed writes planning we have both writes at the same time and able to extract common part out of it to calculate it only once.

johnynek commented 5 years ago

seems like a good guess that the interaction of toPipe is causing the issue.

That is really hiding from the optimizer what is going on.

We could put another rule that we should always avoid materializing a cross.

It's hard to say in-general when we should materialize vs not, only looking at the static graph.

We could imagine adding a new node to the graph to hint: even if we fan this out, don't recomputed it.

maybe .ephemeral or something, that is just a hint to the optimizer to ignore it when adding explicit Forks. I don't know how usable that will be though.

I would probably rather improve the optimization rules...

ttim commented 5 years ago

So basically what happens is:

  1. Two pipe writes get optimized separately
  2. One of them has two maps subsequently which get composed by optimizer
  3. The only common part of this two branches our Pipe compiler sees is cross result so it creates fork in cascading at this point.

Knowing this I also can make similar test without using toPipe:

    val crossed = TypedPipe.from(source).cross(mapPipe)
    crossed.map { case (value, map) => map(value) }.write(sink1)
    crossed.map { case (value, map) => map(identity(value)) }.write(sink2)

For initial issue we can solve it with ignoring Pipes cache when we compile typed pipes in toPipe. But this seems like very weird solution. Also we can solve it by forbidding squashing of maps etc after cross & hashJoin. Or we can do something to avoid materialization of hashJoin & cross at all cost (this might mean we never want to reuse existing cascading Pipe corresponding to hashJoin from compilers cache?) Or we can do something around .ephemeral.

Also I do think hashJoins (and cross in particular) should be never materialized by default. It's really hard for me to imagine a case when it's needed to be materialized and we already have tweak for this - .forceToDisk.

johnynek commented 5 years ago

I agree that we shouldn't materialize immediately after a cross. A hashJoin is maybe a bit different since it can be used as a filter, and in the filter case maybe we don want to checkpoint?

But as you say, the user can add forceToDisk if they want to insist in that case.

We can also add a configuration to change the plan in that case, but with the default not doing a force after the cross/hashJoin.