apache / arrow-julia

Official Julia implementation of Apache Arrow
https://arrow.apache.org/julia/
Other
283 stars 60 forks source link

Benchmark of Arrow.jl vs Pyarrow (/Polars) #393

Open svilupp opened 1 year ago

svilupp commented 1 year ago

First of all, thank you for this amazing package!

Recently, I've been loading a lot of large files and it felt like Arrow.jl loading times are greater than Python. I wanted to quantify this feeling, so I hacked up a rough benchmark (code + results below).

Observations

Proposals

Benchmarking results

Machine:

Task 1: 10x count nonmissing elements in the first column of a table Data: 2 columns of 5K-long strings each, 10% of data missing, 10K rows Timings: (ordered by Uncompressed, LZ4, ZSTD)

Data: 32 partitions (!), 2 columns of 5K-long strings each, 10% of data missing, 10K rows Timings: (ordered by Uncompressed, LZ4, ZSTD)

Task 2: 10x mean value of Int column in the first column of a table Data: 10 columns, Int64, 10M rows Timings: (ordered by Uncompressed, LZ4, ZSTD)

Data: 32 partitions (!), 10 columns, Int64, 10M rows Timings: (ordered by Uncompressed, LZ4, ZSTD)

Benchmark details

benchmark.jl

# Test case 1: 32 Threads available
fn = "data_raw/df_10K_string5K_missing"
@time read_test(fn * "_unc.arrow", 10)
# 0.164394 seconds (92.02 k allocations: 438.760 MiB, 16.59% gc time)
@time read_test(fn * "_lz4.arrow", 10)
# 2.289358 seconds (98.04 k allocations: 2.107 GiB, 18.83% gc time, 0.36% compilation time)
@time read_test(fn * "_zstd.arrow", 10)
# 1.581160 seconds (92.60 k allocations: 1.680 GiB, 3.89% gc time)

# Test case 1: 1 Thread available
fn = "data_raw/df_10K_string5K_missing"
@time read_test(fn * "_unc.arrow", 10)
#  0.199866 seconds (91.94 k allocations: 438.758 MiB)
@time read_test(fn * "_lz4.arrow", 10)
# 2.250688 seconds (92.92 k allocations: 2.106 GiB, 18.67% gc time)
@time read_test(fn * "_zstd.arrow", 10)
# 1.908512 seconds (92.62 k allocations: 1.680 GiB, 20.47% gc time)

# Test case 1: 32 Threads available, arrow file has 32 partitions (+ minor tweak to transcoding function)
fn = "data_raw/df_10K_string5K_missing"
@time read_test(fn * "_unc.arrow", 10; colname=:x1)
# 0.224833 seconds (129.90 k allocations: 442.646 MiB)
@time read_test(fn * "_lz4.arrow", 10; colname=:x1)
# 0.435552 seconds (148.28 k allocations: 1.276 GiB)
@time read_test(fn * "_zstd.arrow", 10; colname=:x1)
# 0.401670 seconds (141.81 k allocations: 1.276 GiB, 13.32% gc time

# Test case 2: 32 Threads available
fn = "data_raw/df_10M_col10"
@time read_test_mean(fn * "_unc.arrow", 10; colname=:x1)
#  0.107945 seconds (6.34 k allocations: 347.328 KiB)
@time read_test_mean(fn * "_lz4.arrow", 10; colname=:x1)
# 17.237241 seconds (29.70 k allocations: 14.903 GiB, 10.65% gc time, 0.10% compilation time)
@time read_test_mean(fn * "_zstd.arrow", 10; colname=:x1)
# 6.157455 seconds (7.65 k allocations: 14.902 GiB, 10.48% gc time)

# Test case 2: 1 Thread available
fn = "data_raw/df_10M_col10"
@time read_test_mean(fn * "_unc.arrow", 10; colname=:x1)
# 0.101603 seconds (6.28 k allocations: 345.750 KiB)
@time read_test_mean(fn * "_lz4.arrow", 10; colname=:x1)
# 16.322216 seconds (28.00 k allocations: 14.903 GiB, 6.25% gc time, 0.10% compilation time)
@time read_test_mean(fn * "_zstd.arrow", 10; colname=:x1)
# 6.311729 seconds (7.58 k allocations: 14.902 GiB, 13.17% gc time)

# Test case 2: 32 Threads available, arrow file has 32 partitions (+ minor tweak to transcoding function)
fn = "data_raw/df_10M_col10"
@time read_test_mean(fn * "_unc.arrow", 10; colname=:x1)
# 0.118847 seconds (161.34 k allocations: 8.437 MiB)
@time read_test_mean(fn * "_lz4.arrow", 10; colname=:x1)
# 1.156759 seconds (200.97 k allocations: 7.460 GiB)
@time read_test_mean(fn * "_zstd.arrow", 10; colname=:x1)
# 0.655502 seconds (191.44 k allocations: 7.460 GiB)

benchmark.py (all 32 threads active, 1 partition/RecordBarch in the arrow file)

# accidentally overwritten... It's in the summaries at the top, but I can re-run it if interesting.

benchmark_partitioned.py (all 32 threads active, 32 partitions/RecordBatches in arrow files)

### Test case 1: Strings (all 32 threads, 32 partitions)
fn1="df_10K_string5K_missing_unc.arrow"
fn2="df_10K_string5K_missing_lz4.arrow"
fn3="df_10K_string5K_missing_zstd.arrow"
#
%time read_test_pandas(fn1,10)
%time read_test_pandas(fn2,10)
%time read_test_pandas(fn3,10)
#
%time read_test_polars(fn1,10)
%time read_test_polars(fn2,10)
%time read_test_polars(fn3,10)
#
%time read_test_polars_pyarrow(fn1,10)
%time read_test_polars_pyarrow(fn2,10)
%time read_test_polars_pyarrow(fn3,10)

#
CPU times: user 349 ms, sys: 898 ms, total: 1.25 s
Wall time: 1.24 s
CPU times: user 519 ms, sys: 649 ms, total: 1.17 s
Wall time: 1.03 s
CPU times: user 1.11 s, sys: 534 ms, total: 1.64 s
Wall time: 1.23 s
#
CPU times: user 6.06 ms, sys: 13.9 ms, total: 20 ms
Wall time: 8.99 ms
CPU times: user 682 ms, sys: 1.42 s, total: 2.11 s
Wall time: 2.09 s
CPU times: user 1.65 s, sys: 1.19 s, total: 2.85 s
Wall time: 2.84 s
#
CPU times: user 551 ms, sys: 744 ms, total: 1.29 s
Wall time: 1.14 s
CPU times: user 764 ms, sys: 833 ms, total: 1.6 s
Wall time: 1.27 s
CPU times: user 1.37 s, sys: 791 ms, total: 2.16 s
Wall time: 1.54 s

### Test case 2: Integers (all 32 threads, 32 partitions)
fn1="df_10M_col10_unc.arrow"
fn2="df_10M_col10_lz4.arrow"
fn3="df_10M_col10_zstd.arrow"
#
%time read_test_mean_pandas(fn1,10)
%time read_test_mean_pandas(fn2,10)
%time read_test_mean_pandas(fn3,10)
#
%time read_test_mean_polars(fn1,10)
%time read_test_mean_polars(fn2,10)
%time read_test_mean_polars(fn3,10)
#
%time read_test_mean_polars_pyarrow(fn1,10)
%time read_test_mean_polars_pyarrow(fn2,10)
%time read_test_mean_polars_pyarrow(fn3,10)

#
CPU times: user 2.51 s, sys: 8.85 s, total: 11.4 s
Wall time: 5.62 s
CPU times: user 4.1 s, sys: 7.29 s, total: 11.4 s
Wall time: 2.84 s
CPU times: user 4.28 s, sys: 5.44 s, total: 9.72 s
Wall time: 2.61 s
#
CPU times: user 159 ms, sys: 87.1 ms, total: 247 ms
Wall time: 232 ms
CPU times: user 4.23 s, sys: 8.66 s, total: 12.9 s
Wall time: 12.8 s
CPU times: user 4.27 s, sys: 8.33 s, total: 12.6 s
Wall time: 12.6 s
#
CPU times: user 2.53 s, sys: 4.06 s, total: 6.59 s
Wall time: 6.58 s
CPU times: user 4.25 s, sys: 4.91 s, total: 9.15 s
Wall time: 6.49 s
CPU times: user 4.33 s, sys: 4.33 s, total: 8.66 s
Wall time: 6.43 s

benchmark_setup.py

!pip install pandas pyarrow polars pathlib
from pathlib import Path
import pandas as pd
import polars as pl
import pyarrow
from pyarrow.feather import write_feather,read_feather

# String isnull tests
def read_test_pandas(fn,n):
  counter=0
  for i in range(n):
    counter+=pd.read_feather(fn).x1.notna().sum()
  return counter

def read_test_polars_pyarrow(fn,n):
  counter=0
  for i in range(n):
    counter+=pl.read_ipc(fn,use_pyarrow=True)["x1"].is_not_null().sum()
  return counter

def read_test_polars(fn,n):
  counter=0
  for i in range(n):
    counter+=pl.read_ipc(fn)["x1"].is_not_null().sum()
  return counter

# length test to make sure _is_not_null is not cheating and reading just metadata
def read_test_len_polars_pyarrow(fn,n):
  counter=0
  for i in range(n):
    counter+=pl.read_ipc(fn,use_pyarrow=True)["x1"].str.lengths().sum()
  return counter

# Integer tests
def read_test_mean_pandas(fn,n):
  counter=0
  for i in range(n):
    counter+=pd.read_feather(fn).x1.mean()
  return counter

def read_test_mean_polars(fn,n):
  counter=0
  for i in range(n):
    counter+=pl.read_ipc(fn)["x1"].mean()
  return counter

def read_test_mean_polars_pyarrow(fn,n):
  counter=0
  for i in range(n):
    counter+=pl.read_ipc(fn,use_pyarrow=True)["x1"].mean()
  return counter

benchmark_setup.jl

using Arrow
using DataFramesMeta
using DataFramesMeta: Tables
using BenchmarkTools
using Random

# utility functions for generation
function generate_numeric(N, ::Type{T}=Int; cols=10) where {T<:Number}
    df = DataFrame(rand(T, N, cols), :auto)
end
function generate_string(N; len=10, allowmissing=true)
    df = DataFrame(x1=map(x -> randstring(len), 1:N), x2=map(x -> randstring(len), 1:N))
    allowmissing && allowmissing!(df)
    return df
end
function add_rand_missing!(df, p=0.1)
    for col in names(df)
        mask_missing = rand(nrow(df)) .< p
        df[mask_missing, col] .= missing
    end
    df
end
function write_out_compressions(df, fn_base)
    Arrow.write(fn_base * "_unc.arrow", df; compress=nothing)
    Arrow.write(fn_base * "_lz4.arrow", df; compress=:lz4)
    Arrow.write(fn_base * "_zstd.arrow", df; compress=:zstd)
    return nothing
end

# utility functions for reading
function read_test(fn, n; colname=:x1)
    counter = 0
    for i in 1:n
        t = Arrow.Table(fn)
        counter += sum(.!ismissing.(t[colname]))
    end
    return counter
end
function read_test_mean(fn, n; colname=:x1)
    counter = 0
    for i in 1:n
        t = Arrow.Table(fn)
        counter += mean((t[colname]))
    end
    return counter
end

# Test case 1: two columns with wide strings and some missing data
fn = "data_raw/df_10K_string5K_missing"
N=10_000
df = generate_string(N; len=5000) |> add_rand_missing!
write_out_compressions(df, fn); 

# Test case 2: 10 columns of 10M Integers
fn = "data_raw/df_10M_col10"
N = 10_000_000
df = generate_numeric(N, Int)
write_out_compressions(df, fn);
quinnj commented 1 year ago

Wow! Thanks for the detailed research/investigation/writeup @svilupp! I think most of what you mentioned all sounds like things we should indeed do. I'm currently bogged down in a few other projects for the next month or two, but I'm hoping to then have some time to contribute more meaningfully to the package as the list of issues has grown faster than I've been able to keep up. I appreciate all the effort you've put in here and would more than welcome PRs to improve things. I usually have time to review things regardless of what else is going on. Looking forward to improving things!

ericphanson commented 1 year ago

Oops, misclick

svilupp commented 1 year ago

I’ve already implemented most of the changes locally. I’ll post some benchmarks and learnings here tomorrow, and open the relevant PRs, if there is interest.

baumgold commented 1 year ago

I’m certainly interested! Thanks for this hard work, @svilupp !

svilupp commented 1 year ago

TL;DR The world makes sense again! Arrow.jl is the fastest reader now (except for one case). It took leveraging threads, skipping unnecessary resizing of buffers, some initialization, and adding support for InlineStrings (stack-allocated strings). Details and the implementation for testing are in here

Here are some learnings for those of you seeking Arrow.jl performance:

The rest is probably not suitable for most users, as it involves changing the package internals: