twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Solving `sortedTake` in beam runner #1947

Closed nownikhil closed 3 years ago

nownikhil commented 3 years ago

Hi folks, We just implemented Beam Runner for scalding. One open issue is with using sortedTake. Since it uses PriorityQueue Monoid we mutate the input elements and that causes the job to fail. This check is enabled for Direct Runner to catch issues during testing and might not be same for Dataflow runner.

We might break consistency guarantees if we mutate input elements. https://stackoverflow.com/questions/43142900/apache-beam-returns-input-values-must-not-be-mutated-in-any-way-when-using-lo

What would be the best way to solve this?

Maybe extend PQ Monoid and overwrite method plus to not mutate bigger PQ and just return a new one and then use it, or have SortedTake as a member of AST and implement it for every runner.

Happy to hear a clever solution to this.

tlazaro commented 3 years ago

Regarding having the SortedTake as first class AST element would allow us to use Top.of from Beam to implement: https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/Top.html.

If we could have a way of keeping current behavior for other backends but Beam using Top would be the safest from the POV of Beam.

tlazaro commented 3 years ago

But Nikhil and I aren't sure this change to the AST is easy or viable.

johnynek commented 3 years ago

I think a way to solve it would be to pattern match on the Monoid:

https://github.com/twitter/scalding/blob/767a9457cf763f84b02f50a687bf6f75d67cb9db/scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamOp.scala#L62

  case EmptyGuard(MapValueStream(SumAll(pq: PriorityQueueMonoid[V])) =>
    val ordering = pq.zero.comparator()
    // now use beam's built in take-of...
johnynek commented 3 years ago

but an additional approach would be to use an immutable heap, and that is really a better approach:

https://github.com/typelevel/cats-collections/blob/master/core/src/main/scala/cats/collections/PairingHeap.scala

and we could implement that in scalding and not call out to algebird's mutable implementation. I really regret the mutable implementation in algebird. We should have implemented an immutable heap there.

nownikhil commented 3 years ago

Should we add dependency of Cats in scalding or copy paste the implementation ?

johnynek commented 3 years ago

I would copy it to avoid the dependency, and copy the tests. (I wrote pairing heap, btw). Just leave the copyright headers and the license is the same, so it would be fine.

But if you don't want to spend that time now, the pattern matching approach would also work without that.

tlazaro commented 3 years ago

Implementing this on #1949. I think we can close this issue and follow up there.