dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 712 forks source link

Pandas-only P2P shuffling #8635

Closed hendrikmakait closed 1 month ago

hendrikmakait commented 1 month ago

Supersedes #8606

github-actions[bot] commented 1 month ago

Unit Test Results

_See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests._

    29 files  +    2      29 suites  +2   10h 21m 53s :stopwatch: + 1h 22m 10s  4 053 tests  -     3   3 784 :white_check_mark:  -   159    168 :zzz: +   62  100 :x: + 97  1 :fire:  -  3  51 387 runs  +7 177  48 343 :white_check_mark: +5 922  2 857 :zzz: +1 088  185 :x: +182  2 :fire:  - 15 

For more details on these failures and errors, see this check.

Results for commit 98b1c326. ± Comparison against base commit 1ec61a1b.

This pull request removes 141 and adds 138 tests. Note that renamed tests count towards both. ``` distributed.shuffle.tests.test_graph ‑ test_does_not_raise_on_stringified_numeric_column_name distributed.shuffle.tests.test_graph ‑ test_raise_on_complex_numbers[cdouble] distributed.shuffle.tests.test_graph ‑ test_raise_on_complex_numbers[clongdouble] distributed.shuffle.tests.test_graph ‑ test_raise_on_complex_numbers[csingle] distributed.shuffle.tests.test_graph ‑ test_raise_on_custom_objects distributed.shuffle.tests.test_graph ‑ test_raise_on_non_string_column_name distributed.shuffle.tests.test_graph ‑ test_raise_on_sparse_data distributed.shuffle.tests.test_merge ‑ test_merge[False-inner] distributed.shuffle.tests.test_merge ‑ test_merge[False-left] distributed.shuffle.tests.test_merge ‑ test_merge[False-outer] … ``` ``` distributed.diagnostics.tests.test_memray ‑ test_basic_integration_scheduler distributed.diagnostics.tests.test_memray ‑ test_basic_integration_scheduler_report_args[False] distributed.diagnostics.tests.test_memray ‑ test_basic_integration_scheduler_report_args[report_args0] distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers[1] distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers[False] distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers[True] distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers_report_args[False] distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers_report_args[report_args0] distributed.shuffle.tests.test_merge ‑ test_merge[disk-inner] distributed.shuffle.tests.test_merge ‑ test_merge[disk-left] … ```
This pull request removes 1 skipped test and adds 59 skipped tests. Note that renamed tests count towards both. ``` distributed.shuffle.tests.test_graph ‑ test_raise_on_custom_objects ``` ``` distributed.shuffle.tests.test_merge ‑ test_merge[memory-inner] distributed.shuffle.tests.test_merge ‑ test_merge[memory-left] distributed.shuffle.tests.test_merge ‑ test_merge[memory-outer] distributed.shuffle.tests.test_merge ‑ test_merge[memory-right] distributed.shuffle.tests.test_shuffle ‑ test_basic_integration[memory-1] distributed.shuffle.tests.test_shuffle ‑ test_basic_integration[memory-20] distributed.shuffle.tests.test_shuffle ‑ test_basic_integration[memory-None] distributed.shuffle.tests.test_shuffle ‑ test_basic_lowlevel_shuffle[False-memory-False-1-1-10] distributed.shuffle.tests.test_shuffle ‑ test_basic_lowlevel_shuffle[False-memory-False-1-1-1] distributed.shuffle.tests.test_shuffle ‑ test_basic_lowlevel_shuffle[False-memory-False-1-10-10] … ```
This pull request skips 5 and un-skips 1 tests. ``` distributed.shuffle.tests.test_merge distributed.shuffle.tests.test_metrics distributed.shuffle.tests.test_shuffle distributed.shuffle.tests.test_shuffle ‑ test_processing_chain[False] distributed.shuffle.tests.test_shuffle ‑ test_processing_chain[True] ``` ``` distributed.tests.test_nanny ‑ test_no_unnecessary_imports_on_worker[pandas] ```

:recycle: This comment has been updated with latest results.

rjzamora commented 1 month ago

@hendrikmakait - Is there any background or motivation you can share about this work? Seems very pandas-specific at the moment.

mrocklin commented 1 month ago

Is there any background or motivation you can share about this work? Seems very pandas-specific at the moment.

I'll let @hendrikmakait answer fully, but my sense is that early experiments showed substantial speedups (like 30% on full TPC-H Benchmarks, not just on the shuffling portion).

Would be nice in the long run if any logic using pandas internals could be dispatched.

I can see how this would be convenient for RAPIDS folks (although probably a little inconvenient for others due to complexity). If we go this direction then maybe helping address/reduce this complexity is something that RAPIDS people could help design/execute/maintain?

rjzamora commented 1 month ago

my sense is that early experiments showed substantial speedups (like 30% on full TPC-H Benchmarks, not just on the shuffling portion).

Very nice!

I can see how this would be convenient for RAPIDS folks (although probably a little inconvenient for others due to complexity).

This is a breaking change for rapids "as is". I suppose it would be "convenient" for rapids to continue working with p2p shuffling :)

If we go this direction then maybe helping address/reduce this complexity is something that RAPIDS people could help design/execute/maintain?

I will certainly help with all of the above if doing so is both welcome and feasible. Feasibility probably depends on it being possible to isolate the logic that interacts with pandas internals. If organizing the code in this way really is "too complex", then cudf may need to rely on a completely distinct version of "p2p" to avoid slowing down Coiled engineers. However, my hope is that this is a natural way to organize things even if cudf wasn't in the picture. (fingers crossed)

For this PR, it seems fine to focus on pandas-only details (especially for now). @hendrikmakait - Perhaps we can meet offline so I can figure out whether rapids needs to prioritize (1) rolling-back p2p support, or (2) enabling the pyarrow-free approach with cudf-backed data.

hendrikmakait commented 1 month ago

@hendrikmakait - Is there any background or motivation you can share about this work? Seems very pandas-specific at the moment.

There are three main benefits I see here:

  1. As @mrocklin mentioned, we have seen significant performance improvements with this. I plan to share some benchmarks later. (The Arrow-based implementation has never been optimized for performance and has some shortcomings like excessive data copying.)
  2. It removes the Arrow dependency, a consistent source of pain because of semantics differing from pandas and its strict typing.
  3. It removes complexity that never worked as intended, e.g., disk-write buffering.

For this PR, it seems fine to focus on pandas-only details (especially for now). @hendrikmakait - Perhaps we can meet offline so I can figure out whether rapids needs to prioritize (1) rolling-back p2p support, or (2) enabling the pyarrow-free approach with cudf-backed data.

Sounds good! I'll ping you offline. Looking at the code we currently have in place, I expect it to be fairly straightforward to dispatch the library-specific pieces of code or to dispatch instantiation of shuffle runs and implement a cuDF-specific version.

rjzamora commented 1 month ago

I've been experimenting with PR using cudf-backed data. I think it will take some work on the rapids side to bring performance in line with main (tpch queries take a ~2x perf hit with dask-cudf), but I wouldn't consider cudf + p2p performance to be a big priority yet. My near-term priority is to keep cudf + p2p "functional". This can be achieved in a fairly simple way by defining/using the following dispatch functions:

def deconstruct_dataframe_shard(shard: pd.DataFrame) -> tuple[Any]:
    """ Deconstruct a DataFrame shard into the information needed to reconstruct it later

    Dispatch on the type of `shard`.
    The elements of the result should be whatever is needed by `restore_dataframe_shard`
    """
    ...

def restore_dataframe_shard(meta: pd.DataFrame, unpickled_shard: tuple[Any]) -> pd.DataFrame:
    """ Reconstruct an un-pickled DataFrame shard

    Dispatch on the type of `meta`.
    `unpickled_shard` corresponds to the round-tripped output of `deconstruct_dataframe_shard`.
    Converts a tuple of un-pickled data (e.g. index  and blocks) back to a DataFrame.
    """
    ...

For the sake of generality, pickle_dataframe_shard would look something like:

def pickle_dataframe_shard(input_part_id: int, shard: pd.DataFrame) -> list[pickle.PickleBuffer]:
    return pickle_bytelist(
        (input_part_id,) +  deconstruct_dataframe_shard(shard),
        prelude=False,
    )

and unpickle_and_concat_dataframe_shards would then look something like:

def unpickle_and_concat_dataframe_shards(
    b: bytes | bytearray | memoryview, meta: pd.DataFrame
) -> pd.DataFrame:
    import dask.dataframe as dd

    unpickled_shards = list(unpickle_bytestream(b))
    unpickled_shards.sort(key=first)
    shards = []

    for _, unpickled_shard in unpickled_shards:
        shards.append(restore_dataframe_shard(meta, unpickled_shard))

    return dd.methods.concat(shards, copy=True)

I suppose the only "complex" part of this general suggestion is that the dispatch functions themselves would need to be exposed/defined in dask.dataframe.

hendrikmakait commented 1 month ago

TL;DR: After running several A/B tests and profiling the code, I have concluded not to move forward with this PR because it is not a universal improvement.

Analysis

A/B tests have shown improvements ranging from 5%-30% in end-to-end runtime for almost all TPC-H queries:

Screenshot 2024-05-14 at 16 49 45

However, they have also shown disastrous results for test_set_index and to a lesser extent, test_set_index_uber_lyft.

Screenshot 2024-05-14 at 16 47 51

It looks like this PR is overfitting on OLAP-style queries that include narrow dataframes with few columns and heavily reduce data at the cost of ETL-style transformations that include wide dataframes with many columns.

Profiling the code before and after, I saw that both the transfer and the unpack phase have become significantly more expensive in test_set_index (by a factor of 3.5x and 2.4x, respectively) and we lose much time due to what looks like GIL contention. The Coiled clusters I ran these on also report a GIL contention very close to 1 (and higher than on main).

This is likely due to many small shards with very few rows. I have tried multiple possible quick wins like removing mmap to avoid the impact of page loads during GIL-restricted code paths or using df.drop() instead of del, but none of these had any positive impact. It's currently unclear how to improve this implementation in a way that reduces GIL contention such that ETL-style workloads are not impacted as harshly. There may be upstream improvements we could make that reduce GIL contention but those would also take a while to get released. I also suspect that this is yet another example of the convoy effect, so reducing GIL contention in some parts will have non-trivial effects as it had in previous attempts at improving P2P performance.

hendrikmakait commented 1 month ago

Example py-spy profiles for future reference:

profiles.zip