dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

[P2P] `merge_unpack` does not reschedule on `ShuffleClosedError` #8660

Open hendrikmakait opened 3 weeks ago

hendrikmakait commented 3 weeks ago

I just noticed that merge_unpack (https://github.com/hendrikmakait/distributed/blob/9fae5dacf4d2cfd5c659e472b0a3ef307d695863/distributed/shuffle/_merge.py#L163-L196) isn't wrapped in handle_unpack_errors like shuffle_unpack (https://github.com/hendrikmakait/distributed/blob/9fae5dacf4d2cfd5c659e472b0a3ef307d695863/distributed/shuffle/_shuffle.py#L94-L101).

This could cause P2P merging not to restart in some edge cases. (I'm not sure which ones, but there should be tests for P2P shuffling that will start to fail if we remove handle_unpack_errors.)