JuliaData / DataFrames.jl

In-memory tabular data in Julia
https://dataframes.juliadata.org/stable/
Other
1.74k stars 368 forks source link

Faster groupby #1896

Closed bkamins closed 3 years ago

bkamins commented 5 years ago

@nalimilan - I am not sure if it is on your to-do list, but today we discussed with @xiaodaigh essentially the following thing:

julia> using DataFrames, StatsBase

julia> using BenchmarkTools

julia> df = DataFrame(x=rand(1:10, 10^8));

julia> @benchmark countmap($df[!, :x])
BenchmarkTools.Trial:
  memory estimate:  762.94 MiB
  allocs estimate:  7
  --------------
  minimum time:     591.505 ms (1.73% GC)
  median time:      673.740 ms (12.18% GC)
  mean time:        670.389 ms (11.89% GC)
  maximum time:     733.736 ms (18.66% GC)
  --------------
  samples:          8
  evals/sample:     1

julia> @benchmark by($df, :x, counts=:x=>length)
BenchmarkTools.Trial:
  memory estimate:  3.24 GiB
  allocs estimate:  145
  --------------
  minimum time:     2.122 s (6.20% GC)
  median time:      2.290 s (13.26% GC)
  mean time:        2.326 s (14.52% GC)
  maximum time:     2.568 s (22.52% GC)
  --------------
  samples:          3
  evals/sample:     1

So we have a significant space for improvement if we would not materialize GroupedDataFrame using groupby but move through the data frame in by in "one shot" and produce the result. The most common functions for which we have "special handling", like length, sum, mean etc. (essentially all for which we can perform online updating) could go into it.

If we made this change we would be really competitive with data.table I think.

What is your perspective on this?

nalimilan commented 5 years ago

AFAICT, part of the difference is due to the fact that countmap uses radix sort by default. It's about 60% slower with alg=:dict. So we would have to also use radix sort to match that performance (but it only works for bitstypes). That would probably not be too hard, we just need an additional row_group_slots for these types (used only when hash is false).

Then, profiling shows that there's a significant time (about 25%) that is not spent in row_group_slots, and which could probably be avoided when calling by on special functions. We could specialize by on these functions to avoid creating a GroupedDataFrame to save that time. But that will probably make the code significantly more complex since currently all the special reductions are handled by combine, using a quite convoluted code. I guess the cleanest approach would be to have by create a special GroupedDataFrame with all fields empty except groups, and have _combine fill the remaining fields only if needed.

bkamins commented 5 years ago

Thank you for dissecting this 😄. Actually the reason why we talked about it with @xiaodaigh is exactly because in some cases we could also detect that it is good to use radixsort as he is working on it.

I agree that some changes might significantly add to complexity and volume of the code but at some point probably it is unavoidable if we want top performance (I do not say we should do it now or in the near future but this is something that would potentially give big gains, eg. in H2O benchmarks we could probably close the gap to data.tables in common cases).

xiaodaigh commented 5 years ago

Also with the introduction of multithreading model, we may be able to get even more speed ups

jeremiedb commented 5 years ago

Regarding having competing benchmarks against data.table, I noticed that using a histogram approach to functions such as sum (or mean) was resulting in even fater aggregation than data.table (at least in the 1000 observations in each 500 groups X 100 columns).

Data prep:

using DataFrames, StatsBase
group_size = 1000
group_n = 500
ncols = 100
data = randn(Float64, group_n*group_size, ncols)
group = sample((1+1_000_000):(group_n+1_000_000), group_n*group_size)

df1 = DataFrame(data)
df1.group = group

Histogram binning approach:

function sum_test_key(mat::Matrix{T}, idx::Vector{Int}, key) where {T}
    res = zeros(T, length(key), size(mat,2))
    for j in 1:size(mat,2)
        for i in 1:size(mat,1)
            res[idx[i], j] += mat[i,j]
        end
    end
    return res
end

Benchmark: DataFrames.jl aggregate vs adhoc binning:

@time df1_sum = aggregate(df1, :group, sum)
# Reuse existing groupby: 
@time g = groupby(df1, :group)
@time df1_sum_mat = sum_test_key(data, g.groups, g.starts)

  0.602296 seconds (350.71 k allocations: 48.272 MiB, 1.04% gc time)
  0.015109 seconds (157 allocations: 15.457 MiB, 17.90% gc time)
  0.083061 seconds (6 allocations: 390.859 KiB)

So, roughly 0.1 sec for the second approach (groupby + sum). data.table took about 0.14 sec on same sized data. The above can also easily be multi-threaded with @thread and took only 0.022 sec on a 8 thread laptop.

Performing the above matrix based operation on a DataFrame resulted in poor performance because of type instability, but I guess there must already be a mecanism in DataFrames to avoid that issue? @bkamins

In short, it appears like there could be substantial speedups through specialized methods for mean, sum and other functions, which happen to be quite common in real life (and benchmarks).

Would it be realistic to take benefit of such approach within DataFrames.jl?

I know @xiaodaigh that would already had optimizations relating to strings pending, not sure if also used the same kind of tricks? The binning approach here is applicable to any type underlying the groupby key.

xiaodaigh commented 5 years ago

@jeremiedb There are three concepts there

  1. groupreduce which is to realise things like min, max, sum can be done in a reduction way,
  2. multithreading
  3. better algorithms for types with a small finite number of values

Many of these has been proposed for data.table (https://github.com/Rdatatable/data.table/issues/2458) by me.

So I think data.table and Julia can both implement these. You may want to keep an eye out for https://github.com/xiaodaigh/FastGroupBy.jl as that can be an experimental ground for these type of things. It's not clear if Julia will still be faster if both implements the same ideas, but Julia might be easier to do them because it's higher level language.

I can't run your code, but I think you code will struggle if group_n is large because of scattered usage of cache

using DataFrames, StatsBase

group_size = 1000
group_n = 2^16
ncols = 1
data = randn(Float64, group_n*group_size, ncols)

df1 = DataFrame(data)
df1.group = sample((1+1_000_000):(group_n+1_000_000), group_n*group_size)

using FastGroupBy:fastby

df1[!, :group] = compress(categorical(df1[!, :group]))

using BenchmarkTools
@benchmark df1_sum = aggregate(df1, :group, sum)
@benchmark g = by(df1, :group, ok = :x1 => sum)
@benchmark g = fastby(sum, df1, :group, :x1)

See how your code compares to the above 3 on just one column.

jeremiedb commented 5 years ago

I've made the adjustment in the above code to add the DataFrames and Statsbase dependencies.

Here are how the benchmarks goes on 2^16 groups 1000 each, 1 column:

@btime df1_sum = aggregate(df1, :group, sum)
# 3.977 s (4979960 allocations: 1.77 GiB)

@btime g = by(df1, :group, ok = :x1 => sum)
# 2.320 s (187 allocations: 1008.82 MiB)

@btime g = fastby(sum, df1, :group, :x1);
# 2.161 s (197027 allocations: 1.76 GiB)

And with ad-hoc adaptation of the function to iterate on a single column:

function sum_test_single(mat::Matrix{T}, idx::Vector{Int}, key) where {T}
    res = zeros(T, length(key), size(mat,2))
    for j in 1:1
        for i in 1:size(mat,1)
            res[idx[i], j] += mat[i,j]
        end
    end
    return res
end
@btime g_groupby = groupby(df1, :group)
# 2.186 s (61 allocations: 1001.57 MiB)

@btime g = sum_test_single(data, g_groupby.groups, g_groupby.starts)
# 168.918 ms (2 allocations: 512.08 KiB)

So I was suprised by the by performance compared to the aggregate, I thought they would have shared the same optimizations.

Otherwise, seems like to total time of the histogram/binning approach matches that of the by, though it highlights that the bottleneck in a case of very high number of groups is clearly to get the group identifiers more than the crunching part. And I realized that I should switch from aggregate to by!

xiaodaigh commented 5 years ago

@jeremiedb you cannot know the number of groups before hand if your column is Vector{Int64} because there would be 2^64 possible groups!

bkamins commented 5 years ago

Two small comments:

  1. you know number of groups is not larger than number of rows of a data frame (I am not sure it is useful though in your case)
  2. @nalimilan has implemented optimized methods for sum, mean etc. cases already

Though the difference between agreegate and by is big - are you sure it is not due to compilation?

nalimilan commented 5 years ago

@jeremiedb AFAICT we already implement the approach you describe in by/combine: https://github.com/JuliaData/DataFrames.jl/blob/6e287a17aa104f19ca30dfca05bd9825b5048efa/src/groupeddataframe/grouping.jl#L616-L642 As @xiaodaigh noted, this algorithm requires you to know in advance the number of groups, and also that groups are consecutive integers. groupby generates such group indices, but that takes some significant time (we have optimized paths for PooledArray and CategoricalArray since these already give consecutive integers).

aggregate is indeed slower than by because it doesn't use the specialized code for reductions. Fixing that shouldn't be too hard, but for some time we've considered deprecating it in favor of by. In the end we might keep it, in which case we should make it as fast as by.

xiaodaigh commented 5 years ago

FastGroupBy.jl is almost obsolete now!

nalimilan commented 5 years ago

I think we still need to implement radix sort for grouping on numeric or string columns. Currently only PooledArray and CategoricalArray use optimized methods in groupby.

jeremiedb commented 5 years ago

@bkamins For aggregate, the difference is indeed very large and I can confirm the spread. Back to a 500 groups of 1000 obs each, 100 columns:

group_size = 1000
group_n = 500
ncols = 100
data = randn(Float64, group_n*group_size, ncols)
group = sample((1+1_000_000):(group_n+1_000_000), group_n*group_size)
df1 = DataFrame(data)
df1.group = group

@btime df1_sum = aggregate(df1, :group, sum)
# 487.722 ms (350701 allocations: 48.27 MiB)

transforms = [(Symbol("x$i") => sum) for i in 1:100]
@btime g = by(df1, :group, transforms)
# 58.436 ms (2869 allocations: 17.25 MiB)

I wouldn't mind see the aggregate disappear, or otherwise stress the potential performance caveats if no optimization applied, as it may avoid having some others like me starting considering making their data crunching out of matrices :P

That said, very happy with what I see on the by benchmarks, feels like an appealing alternative to data.table. And sorry about the distraction on aggregate!

bkamins commented 4 years ago

I am adding 2.0 milestone as it would be nice to have a better performance at some point.

nalimilan commented 4 years ago

Maybe use 1.x for non breaking changes that you want to prioritize? There's no reason to wait for 2.0.

bkamins commented 4 years ago

We have no breaking issues nor PRs marked 2.0 (except for requests in changes of printing which are breaking but only in a minor way). So I will simply change this milestone to 1.x. Then we will use 2.0 for breaking changes after 1.0 release.

tkf commented 4 years ago

FYI, https://github.com/tkf/ThreadsX.jl has parallel quicksort (stable and unstable), merge sort, and counting sort.

I also have a working parallel MSD radix sort (which is usable for long strings, vectors, and composite objects) in a private repository and cleaning it up for release. I'm also thinking to implement parallel quicksort with multiple pivots since it can share some implementation with radix sort. I don't know when I can finish it, though.

bkamins commented 4 years ago

Very interesting. I think adding threading support for DataFrames.jl will be one of the points for work after 1.0 release (so that we have a stable API we work with).

tkf commented 4 years ago

I guess groupby etc. can "just" take alg (sortalg?) keyword argument and propagate it to sortperm function? This can happen after 1.0, of course.

nalimilan commented 4 years ago

Currently we don't use sorting for grouping at all, but a hash table (except for CategoricalArray and PooledArray columns). So that will require a bit of refactoring. Knowing that a parallel implementation is available makes it more appealing!

bkamins commented 4 years ago

We use sorting if user asks groups to be sorted (when sort=true), but I guess this is cheap in the whole cost of groupby.

nalimilan commented 4 years ago

Yeah it should be very cheap if the number of rows per group is large. If you have only a few rows per group, the cost can be more significant, and in that case using sorting to group would be a big gain I guess.

tkf commented 4 years ago

Oh, I missed that it's hash-based in DataFrames.

BTW, if you want to support sorting-based groupby for many types, you might be interested in https://github.com/JuliaLang/julia/issues/34815. For example, ATM, you can't sort complex numbers so you can't do groupby on complex valued column, when using sort. It may be better to define isless on Complex (but not <, of course), so that it is sortable. (edit: Ah, never mind. I guess you'd use radix sort in this case anyway.)

xiaodaigh commented 4 years ago

This is essentially done right?

nalimilan commented 4 years ago

No, we still use hash table-based grouping for integers. Though it would be easy to at least use the same optimized method as for PooledArray/CategoricalArray when the range of the values is small.

nalimilan commented 3 years ago

We now use the optimized method for integer columns with reasonably narrow ranges. It's now a bit faster than countmap:

julia> using DataFrames, StatsBase

julia> using BenchmarkTools

julia> df = DataFrame(x=rand(1:10, 10^8));

julia> @benchmark countmap($df[!, :x])
BenchmarkTools.Trial: 
  memory estimate:  762.94 MiB
  allocs estimate:  7
  --------------
  minimum time:     534.464 ms (0.00% GC)
  median time:      548.505 ms (0.03% GC)
  mean time:        573.655 ms (4.87% GC)
  maximum time:     624.263 ms (11.46% GC)
  --------------
  samples:          9
  evals/sample:     1

julia> @benchmark combine(groupby($df, :x), :x => length => :counts)
BenchmarkTools.Trial: 
  memory estimate:  762.96 MiB
  allocs estimate:  262
  --------------
  minimum time:     424.528 ms (0.00% GC)
  median time:      445.360 ms (0.08% GC)
  mean time:        461.313 ms (6.48% GC)
  maximum time:     506.731 ms (14.50% GC)
  --------------
  samples:          11
  evals/sample:     1

julia> @benchmark combine(groupby($df, :x), nrow => :counts)
BenchmarkTools.Trial: 
  memory estimate:  762.96 MiB
  allocs estimate:  247
  --------------
  minimum time:     422.463 ms (0.00% GC)
  median time:      469.537 ms (0.08% GC)
  mean time:        480.697 ms (6.23% GC)
  maximum time:     533.588 ms (13.26% GC)
  --------------
  samples:          11
  evals/sample:     1