spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 514 forks source link

Use hash join when writing sparkey #5402

Closed aslotnick closed 2 months ago

aslotnick commented 2 months ago

When writing to sparkey, allShards represents every expected shard even if there is no corresponding data in shards for that shard number.

shards.rightOuterJoin(allShards) (added in https://github.com/spotify/scio/pull/5208) fails when a shard contains large amounts of data, leading to the error described in https://github.com/spotify/scio/issues/5300: java.lang.OutOfMemoryError: Required array length 2147483639 + 15534 is too large.

This PR replaces rightOuterJoin with hashFullOuterJoin (note that there is no hashRightOuterJoin implementation). A hash join is a good fit because the right-hand side contains very little data (only the keys of the shards) and it doesn't need to use an array to represent the large left-hand side's values. As a result, some failing workflows that succeeded in Scio 0.13.* will run successfully again.

codecov[bot] commented 2 months ago

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 61.26%. Comparing base (6d755f9) to head (bfcec57). Report is 15 commits behind head on main.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #5402 +/- ## ========================================== + Coverage 61.22% 61.26% +0.03% ========================================== Files 310 310 Lines 11061 11061 Branches 755 755 ========================================== + Hits 6772 6776 +4 + Misses 4289 4285 -4 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.