JuliaData / DataFrames.jl

In-memory tabular data in Julia
https://dataframes.juliadata.org/stable/
Other
1.73k stars 367 forks source link

Scope for performance improvement of outerjoin when `on` is sorted? #3270

Open chiraganand opened 1 year ago

chiraganand commented 1 year ago

In TSFrames.jl the join function looks like:

function Base.join(
    ts1::TSFrame,
    ts2::TSFrame,
    ts...;
    jointype::Symbol=:JoinAll
)
    result = joinmap[jointype](ts1.coredata, ts2.coredata, on=:Index, makeunique=true)
    for tsf in ts
        result = joinmap[jointype](result, tsf.coredata, on=:Index, makeunique=true)
    end
    return TSFrame(result)
end

In the examples below the R function performs better than TSFrames.join() by a factor of almost 6x.

R

> library(xts)
> library(microbenchmark)

> d1 = seq.Date(as.Date(0, origin = "0000-01-01"), by = 1, length.out = 10000000);
> d2 = seq.Date(as.Date(365*1000, origin = "0000-01-01"), by = 1, length.out = 10000000);
> ts1 = xts(1:length(d1), d1);
> ts2 = xts(1:length(d2), d2);

> microbenchmark(cbind(ts1, ts2, all = TRUE), times = 100)
 Unit: milliseconds
                         expr      min       lq     mean   median       uq
  cbind(ts1, ts2, all = TRUE) 134.7299 149.9012 166.7997 163.1097 179.3961
      max neval
  235.856   100

Julia

julia> d1 = range(Date(0), length=10_000_000, step=Day(1))
julia> d2 = range(Date(1000), length=10_000_000, step=Day(1))
julia> ts1 = TSFrame(1:length(d1), d1);
julia> ts2 = TSFrame(1:length(d2), d2);
julia> @benchmark join(ts1, ts2; jointype=:JoinAll) samples=100
BenchmarkTools.Trial: 6 samples with 1 evaluation.
 Range (min … max):  884.946 ms …   1.043 s  ┊ GC (min … max):  4.01% … 15.44%
 Time  (median):     969.911 ms              ┊ GC (median):     9.85%
 Time  (mean ± σ):   967.573 ms ± 63.127 ms  ┊ GC (mean ± σ):  10.29% ±  5.27%
   <histogram removed>
 Memory estimate: 1.31 GiB, allocs estimate: 394.

I tried writing a simple while loop for joining two TSFrame objects but only going through that loop and doing all the if-else comparisons 10million times takes more than 2 seconds without storing the results. I was assuming the algo below is taking advantage of the fact that :Index column is already sorted so you can go through both the indexes sequentially which should save a lot of computation.

function joinnew(ts1::TSFrame, ts2::TSFrame)
    #result = Vector{Tuple{Union{Date, Missing}, Union{Date, Missing}}}()
    index1 = index(ts1)
    index2 = index(ts2)

    i = 1
    j = 1
    last1 = lastindex(index1)
    last2 = lastindex(index2)
    while (i <= last1 && j <= last2)
        if index1[i] < index2[j]
            # push!(result, (index1[i], missing))
            i += 1
        elseif index1[i] == index2[j]
            # push!(result, (index1[i], index2[j]))
            i += 1
            j += 1
        else
            # push!(result, (missing, index2[j]))
            j += 1
        end
    end
    while (i <= last1)
        # push!(result, (index1[i], missing))
        i += 1
    end
    while (j <= last2)
        # push!(result, (index2[j], missing))
        j += 1
    end
end

julia> @benchmark joinnew(ts1, ts2) samples=100
BenchmarkTools.Trial: 3 samples with 1 evaluation.
 Range (min … max):  2.113 s …   2.186 s  ┊ GC (min … max): 7.37% … 7.54%
 Time  (median):     2.163 s              ┊ GC (median):    7.62%
 Time  (mean ± σ):   2.154 s ± 37.562 ms  ┊ GC (mean ± σ):  7.79% ± 0.58%
  <histogram removed>
 Memory estimate: 1.47 GiB, allocs estimate: 98536480.
bkamins commented 1 year ago

DataFrames.jl takes advantage of data being sorted. The issue is as follows:

julia> @time outerjoin(ts1.coredata, ts2.coredata, on=:Index, makeunique=true);
  0.269071 seconds (257 allocations: 411.998 MiB)

julia> @time join(ts1, ts2; jointype=:JoinAll);
  0.587725 seconds (355 allocations: 1.310 GiB, 17.48% gc time)

So the major problem is that the total cost of the join is not in outerjoin, but in TSFrame constructor invoked later.

Now your joinnew function should be:

function joinnew(index1, index2)
    i = 1
    j = 1
    last1 = lastindex(index1)
    last2 = lastindex(index2)
    while (i <= last1 && j <= last2)
        if index1[i] < index2[j]
            # push!(result, (index1[i], missing))
            i += 1
        elseif index1[i] == index2[j]
            # push!(result, (index1[i], index2[j]))
            i += 1
            j += 1
        else
            # push!(result, (missing, index2[j]))
            j += 1
        end
    end
    while (i <= last1)
        # push!(result, (index1[i], missing))
        i += 1
    end
    while (j <= last2)
        # push!(result, (index2[j], missing))
        j += 1
    end
end

and then the timing you get is:

julia> @time joinnew(index(ts1), index(ts2));
  0.016491 seconds

which you probably wanted.

What we could do:

Is this clear? (and if yes how would you want to move forward with the issue?)