JuliaData / DataFrames.jl

In-memory tabular data in Julia
https://dataframes.juliadata.org/stable/
Other
1.73k stars 367 forks source link

Weird slowdowns when grouping on PooledArray #2564

Open nalimilan opened 3 years ago

nalimilan commented 3 years ago

It seems row_group_slots for arrays implementing DataAPI.refpool is much slower than it should. Profiling reveals that most of the time is spent on the groups[i] = j line. Yet, calling row_group_slots directly is twice faster than using groupby. The difference seems to be due to the allocation of the groups vector, but allocating the array separately is very fast. Not sure whether that's a benchmarking artifact.

julia> using DataFrames, PooledArrays, BenchmarkTools

julia> N = 10_000_000;

julia> df = DataFrame(x=PooledArray(rand(1:10, N)), y=rand(N));

julia> @btime groupby(df, :x);
  43.736 ms (40 allocations: 76.30 MiB)

julia> @btime DataFrames.row_group_slots($((df.x,)), Val(false), Vector{Int}(undef, nrow(df)), false, false);
  43.233 ms (17 allocations: 76.30 MiB)

julia> @btime DataFrames.row_group_slots($((df.x,)), Val(false), $(Vector{Int}(undef, nrow(df))), false, false);
  16.615 ms (14 allocations: 1.07 KiB)

julia> @btime Vector{Int}(undef, nrow(df));
  2.784 μs (3 allocations: 76.29 MiB)

# Slow (!)
julia> f(df) = DataFrames.row_group_slots((df.x,), Val(false), Vector{Int}(undef, nrow(df)), false, false)
f (generic function with 1 method)

julia> @btime f(df);
  44.118 ms (20 allocations: 76.30 MiB)

This is not new as it also happens both on master and on 0.21.8, and both with Julia 1.5 and 1.6.

bkamins commented 3 years ago

I think it is a benchmarking artifact.

A small thing is that we could replace DataAPI.refpool.(cols) with map(DataAPI.refpool, col) as it should reduce the compilation time. But this is minor. Also as @timholy suggests - this function can be probably split up into several smaller functions depending on sort and skipmissing arguments.

Finally - and I believe this is a major potential gain: if sort=false, skipmissing=false and a single column is passed when refpool <: AbstractVector then we can just check allunique(only(refpools)) and if all levels are used (most of the time they will be used in practice) and then have a fast path function.

Finally in DataAPI.jl we could add a function that for a given array returns information if refpool is guaranteed to be unique (so that we do not have to run a check if allunique(only(refpools)) in that case.

Sorry for a long list of things, but in combination they potentially will improve speed in the most common case (my benchmarks show that at least ~10x speedup is to be gained in the common scenarios) + possibly also compile time latency.

nalimilan commented 3 years ago

I think it is a benchmarking artifact.

You mean that allocating a vector really takes about 25ms?

A small thing is that we could replace DataAPI.refpool.(cols) with map(DataAPI.refpool, col) as it should reduce the compilation time. But this is minor. Also as @timholy suggests - this function can be probably split up into several smaller functions depending on sort and skipmissing arguments.

Sure we can do that.

Finally - and I believe this is a major potential gain: if sort=false, skipmissing=false and a single column is passed when refpool <: AbstractVector then we can just check allunique(only(refpools)) and if all levels are used (most of the time they will be used in practice) and then have a fast path function.

Unfortunately in my tests I couldn't find a faster implementation than the current one for the example above (i.e. with only 10 groups). This is what prompted me to file this issue. Even removing all the code handling multiple columns, skipmissing and sort, I get similar timings. And that doesn't feel right for it to be slower than combine(gd, :y => sum). How and in which cases do you get 10× speedups?

Finally in DataAPI.jl we could add a function that for a given array returns information if refpool is guaranteed to be unique (so that we do not have to run a check if allunique(only(refpools)) in that case.

Yes that's https://github.com/JuliaData/DataFrames.jl/issues/2558 right?

bkamins commented 3 years ago

You mean that allocating a vector really takes about 25ms?

No I mean that I have checked several options with running @time many times (as this is essentially what matters) and they were similar. It is possible that in DataFrames.row_group_slots($((df.x,)), Val(false), $(Vector{Int}(undef, nrow(df))), false, false); something is optimized out that should not be optimized out (but I might be wrong).

How and in which cases do you get 10× speedups?

This is the speedup:

julia> df = DataFrame(x=PooledArray(rand('a':'k', 10^8)));

julia> @time gdf = groupby(df, :x);
  0.326293 seconds (47 allocations: 762.943 MiB, 18.72% gc time)

julia> gdf.groups == collect(Int, DataAPI.refarray(df.x))
true

julia> @time collect(Int, DataAPI.refarray(df.x));
  0.247577 seconds (2 allocations: 762.940 MiB)

julia> @time copy(DataAPI.refarray(df.x));
  0.027378 seconds (2 allocations: 95.368 MiB)

So:

  1. you are right - with collect(Int, DataAPI.refarray(df.x)) the speedup is only around 30% (still it is a lot IMO and I would go for it)
  2. if we allowed gdf.groups to be a vector of something else than Int then it could be much cheaper as you see with copy(DataAPI.refarray(df.x)) (but I am not sure if it is worth it - but maybe yes as we can precompile things, we would have to check if later we do not get a slowdown in compile if we allow gdf.groups to hold something else than Int)

Yes that's #2558 right?

Right - I just wanted to keep all things in one place.

bkamins commented 3 years ago

Update! I have checked PooledArrays.jl#master (which has refarray eltype set to UInt32) and we get:

julia> @benchmark collect(Int, DataAPI.refarray($df.x))
BenchmarkTools.Trial:
  memory estimate:  762.94 MiB
  allocs estimate:  2
  --------------
  minimum time:     231.792 ms (0.14% GC)
  median time:      300.978 ms (21.56% GC)
  mean time:        296.998 ms (20.61% GC)
  maximum time:     367.297 ms (34.96% GC)
  --------------
  samples:          17
  evals/sample:     1

julia> @benchmark collect(UInt32, DataAPI.refarray($df.x))
BenchmarkTools.Trial:
  memory estimate:  381.47 MiB
  allocs estimate:  2
  --------------
  minimum time:     109.974 ms (0.00% GC)
  median time:      113.834 ms (0.00% GC)
  mean time:        146.304 ms (20.27% GC)
  maximum time:     265.318 ms (49.11% GC)
  --------------
  samples:          35
  evals/sample:     1

julia> @benchmark groupby($df, :x)
BenchmarkTools.Trial:
  memory estimate:  762.94 MiB
  allocs estimate:  46
  --------------
  minimum time:     271.016 ms (0.00% GC)
  median time:      340.696 ms (19.26% GC)
  mean time:        335.788 ms (18.19% GC)
  maximum time:     403.206 ms (32.10% GC)
  --------------
  samples:          15
  evals/sample:     1

so indeed groupby has very little overhead. What we could do is to have two allowed values of gdf.groups: UInt32 and Int. With the default UInt32 and Int as a fallback if we have extremely many groups (this fallback will not be triggered in practice often I think as it would require hundreds of GBs of RAM).

As you can see we can win ~50% of time (just by saving space with UInt32 vs Int). This should allow us to move up in https://h2oai.github.io/db-benchmark/ significantly for 50GB tests even without multithreading.

bkamins commented 3 years ago

We could even go for:

mutable struct GroupedDataFrame{T<:AbstractDataFrame, S<:Union{UInt32, Int}}
    parent::T
    cols::Vector{Symbol}                   # column names used for grouping
    groups::Vector{S}                       # group indices for each row in 0:ngroups, 0 skipped
    idx::Union{Vector{S}, Nothing}       # indexing vector sorting rows into groups
    starts::Union{Vector{S}, Nothing}    # starts of groups after permutation by idx
    ends::Union{Vector{S}, Nothing}      # ends of groups after permutation by idx
    ngroups::Int                           # number of groups
    keymap::Union{Dict{Any, S}, Nothing} # mapping of key tuples to group indices
    lazy_lock::Threads.ReentrantLock       # lock is needed to make lazy operations
                                           # thread safe
end

and set S to UInt32 if nrow(parent) < typemax(UInt32) (which in practice is almost always the case). This will significantly reduce memory consumption and should give speed up. What do you think?

The only challenge will be testing against Int (but this should be doable).

What do you think?

nalimilan commented 3 years ago

Note that collect(Int, DataAPI.refarray(df.x)) or collect(DataAPI.refarray(df.x)) give an overly optimistic estimate of the speed we can actually reach, as we need to fill the seen vector to keep track of which groups we have encountered. AFAIK it disables SIMD in the loop so the gain due to using UInt32 will be lower than with collect. So we need to measure that first to make sure it would be worth it.

We could imagine adding a field to PooledArray indicating whether all values are used, in which case we could avoid computing seen. It would be set to true on construction (when appropriate), and set to false when calling setindex! and other functions that can make a level unused. That would help when working e.g. with data imported from CSV, but not once you extract a subset.

One issue with using UInt32 is that for the dict-based fallback method, we don't know in advance how many groups there will be: so we would have to check that we don't reach the threshold, and if so copy everything to a new table. That should be pretty rare, but it makes the code more complex and as you noted it's hard to test (unless we also allow UInt8).

bkamins commented 3 years ago

as we need to fill the seen vector to keep track of which groups we have encountered.

agreed, but I assumed it would be fast (maybe I am wrong).

We could imagine adding a field to PooledArray indicating whether all values are used

I thought about it, but this is problematic, also with views (see https://github.com/JuliaData/PooledArrays.jl/issues/44 as a related issue).

we don't know in advance how many groups there will be:

This is not a problem I think. We can check how many rows a data frame has. If it is less than typemax(UInt32) we are safe to assume there are at most that many groups.


Having said that - I think that the current code is quite fast. However, I believe that even if we keep the current code but switch it to UInt32 it will be faster (less memory movement and larger portions of data will fit into CPU cache). This will have an impact for cases with very many groups (but this is the case we are currently bad at in H2O benchmarks)

nalimilan commented 3 years ago

OK, it seems that using UInt32 indeed takes about 40% less time than Int for the following benchmark:

using Revise, DataFrames, BenchmarkTools, PooledArrays
N = 100_000_000;
df = DataFrame(x=PooledArray(rand(1:10, N), UInt32), y=rand(N));
@btime groupby(df, :x);

I've pushed changes to test UInt32 to the nl/UInt32 branch if you want to have a look (it's not clean, just to make a quick test).

bkamins commented 3 years ago

This is fantastic - I will have a look and report back. Also I will test how later combine performance looks like (to make sure that we do not degrade the performance later). I assume you are using PooledArrays.jl#master (as it always uses UInt32 for refpool which is relevant - that is why I pinged to tag a new release of PooledArrays.jl)

bkamins commented 3 years ago
  1. so combine(gdf, :x => sum) is tad faster on UInt32 (but what is important that it is not slower)
  2. the second comment is to the implementation on master (but we will have the same also on your branch as it is largely unrelated).

For your gdf we have:

julia> @time combine(gdf, :x => sum)
 20.399369 seconds (276 allocations: 3.501 GiB, 2.46% gc time)
10×2 DataFrame
 Row │ x      x_sum
     │ Int64  Int64
─────┼─────────────────
   1 │     2  20008186
   2 │     4  40003392
   3 │     1   9995654
   4 │     9  89992539
   5 │     8  79997288
   6 │     3  30012795
   7 │     6  59993004
   8 │     5  50011285
   9 │    10  99970470
  10 │     7  69987190

julia> @time combine(gdf, sdf -> sum(sdf.x))
  1.487420 seconds (2.00 M allocations: 867.696 MiB, 19.10% gc time)
10×2 DataFrame
 Row │ x      x1
     │ Int64  Int64
─────┼─────────────────
   1 │     2  20008186
   2 │     4  40003392
   3 │     1   9995654
   4 │     9  89992539
   5 │     8  79997288
   6 │     3  30012795
   7 │     6  59993004
   8 │     5  50011285
   9 │    10  99970470
  10 │     7  69987190

can you please cross check that? If this is not some bug on my side this means that we have a significant problem in fast reductions implementation.

EDIT: the performance problem is with sum and mean but not with e.g. nrow or maximum

bkamins commented 3 years ago

@nalimilan - is this issue boiled down to using UInt32 if we know gdf is short? If yes - then probably we should change the title of the issue and mark in 1.x milestone? Right?

nalimilan commented 3 years ago

Yes I think so. Indeed it's not breaking so it can happen after 1.0. Though it could be worth implementing now in order to look good in benchmarks.

I had missed your previous comment. Indeed I also see the very bad performance of combine(gdf, :x => sum) on my branch. I guess there's a type instability somewhere, which doesn't trigger allocations because it's inferred as Union{Int, UInt32}.

bkamins commented 3 years ago

OK - thank you for looking into this (both changing to Int32 and type instability).

At some point same thing can be done for joins, see https://github.com/JuliaData/DataFrames.jl/issues/2688.

bkamins commented 3 years ago

I resolve the slowdown part in https://github.com/JuliaData/DataFrames.jl/pull/2708. The Uint32 part is something I assume you are looking at - is this correct @nalimilan? Thank you!

nalimilan commented 3 years ago

I have a mostly working implementation in the (updated) nl/UInt32 branch. But actually I'm not sure it's really worth the increased complexity. It's about 30% faster for groupby, but given how fast that part is, that doesn't make a big difference even if the only thing you do with the result is call the optimized combine to compute a sum.

using Revise, DataFrames, BenchmarkTools, PooledArrays
N = 100_000_000;
df = DataFrame(x=PooledArray(rand(1:10, N), UInt32), y=rand(N));

# main
julia> @btime groupby(df, :x);
  29.111 ms (40 allocations: 76.30 MiB)

julia> @btime combine(groupby(df, :x), :y => sum);
  61.132 ms (240 allocations: 76.31 MiB)

# branch:
julia> @btime groupby(df, :x);
  21.233 ms (45 allocations: 38.15 MiB)

julia> @btime combine(groupby(df, :x), :y => sum);
  49.567 ms (244 allocations: 38.16 MiB)
bkamins commented 3 years ago

This was also my fear. Adding UInt32 support will increase code complexity and thus increase compilation latency, so maybe it is not worth in practice given that DataFrames.jl is mostly used interactively.

So what do we do. Close it for now and the same with #2688, or we keep them open for the future decision (then I would rename this issue to clearly indicate that using UInt32 is the thing that is to be done).