twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 703 forks source link

scalding-beam custom join functions #1959

Closed tlazaro closed 2 years ago

tlazaro commented 2 years ago

Following the discussion #1958, attempting a custom join implementation for Beam backend.

tlazaro commented 2 years ago

It's unfortunate to have this duplication but it looks like the right direction to me.

I added some comments.

Does this solve the performance issue you had with the 7-way join?

Thanks for the review! Haven't tested yet, maybe next week we can do so. Will try out the iterator/stream.

tlazaro commented 2 years ago

@johnynek the massive join succeeded with this change!

Then the job went on to fail on a shard(16) that is totally unnecessary in this case on Dataflow. Not sure if we can and should have a rule on how to map that but I'm leaning towards not sharding at all, maybe just a Reshuffle on Beam but no custom grouping. That's a problem for another PR. In this case I think it's fine to just remove the sharding from the user job.

johnynek commented 2 years ago

That's great news! Let's make sure to comment well so people don't wonder why this is done.

btw: note this:

https://github.com/twitter/scalding/issues/1639

I think that is a real issue that should be addressed... We keep kind of punting it. It could possibly be related to your problem here.

https://github.com/twitter/scalding/blob/de2c7c20d7a36a209b81fefa0408a74ef947833c/scalding-core/src/main/scala/com/twitter/scalding/typed/functions/Functions.scala#L137

I don't see any reason to do nextInt(modulus). I think it is only reducing the possible entropy. I think it is better to do rng.nextInt() and get the full entropy, and then use the modulus only at the end.

johnynek commented 2 years ago

btw: it looks like if all we do with these iterables is convert them to java iterables then your current code is likely safe:

https://github.com/scala/scala/blob/4152c02cd8b222ee1782929ecbaf73e85bc8a214/src/library/scala/collection/convert/Wrappers.scala#L23

and will not cause any materializations.

nownikhil commented 2 years ago

@johnynek This code worked with 7 way join and things look stable. Do you think we can merge this?

johnynek commented 2 years ago

Yes, I think this is certainly shape to merge. Merge when you like!