dask / dask-expr

BSD 3-Clause "New" or "Revised" License
83 stars 22 forks source link

Data Skew #417

Open mrocklin opened 10 months ago

mrocklin commented 10 months ago

I'm curious about mechanisms to address data skew in sorting/shuffling/merge operations, where some values are far more common than others. This comes up a lot when talking to Spark people as a common thing to watch out for. It's also a non-trivial pain point for some dask.dataframe users.

I think a useful benchmark to explore and become sensitive to these workloads is the RenRe workflow some folks here have.

Quantiles help, but maybe not enough?

Maybe what we do today with pre-computing quantiles helps with some of this. It means that common values are more likely to be alone in their own partition, rather than with lots of other data (this would happen if we hash, for example). Maybe that's why it has been less of a concern for us than for Spark people historically?

Beyond that though, let's consider the case where there is a single value that is very common, and we're doing some join-like operation that expands it even further. How can we avoid memory blowing up?

Solutions

Spark uses iterative algorithms, kind of like how we operate in dask.bag today. They consume chunks of dataframes and emit chunks of dataframes. We could maybe do something similar in some cases

After blockwise fusion could look at fused expressions and find any that have joins in them. Probably we could switch these to run the same set of blockwise operations, but in a for loop over smaller bits of data coming in. This is only beneficial if there are one of two other kinds of nodes in the fused expression after the join / expanding operation:

  1. The beginning of an ApplyConcatApply sequence. This likely reduces the data, and then we can throw in an extra concat and combine at the end of the first apply stage.
  2. Writing to parquet/csv where we probably write each chunk as a row group or something to some data sink

Timing

This feels important, but not urgent. I don't think that we should think too much about it until after we've got dask-expr replacing dask.dataframe and good performance on benchmarks that don't include much skew.

Probably the one thing to do early-ish is to see how dask-expr and the new p2p system perform on the RenRe benchmark. cc @fjetter

fjetter commented 10 months ago

I remember the RenRe problem to suffer mostly from bloating data. There was an outer join + groupby that generated so many groups that we couldn't handle it without split_out. With split_out shuffle perf was just abysmal. Haven't looked at this in a long time, though.

Regardless of whether this is a good example, data skew is of course still an issue we should think about.