jorgecarleitao / datafusion-python

A Python library to run analytics workloads with the performance of Rust, the flexibility of Python and O(1) cost in moving data between the two. Uses Apache Arrow in-memory format and respective query engine DataFusion.
MIT License
60 stars 4 forks source link

Speed comparison of package compared to Pandas / Pyarrow_ops #4

Open TomScheffers opened 3 years ago

TomScheffers commented 3 years ago

I wrote a comparison between this package and pandas / pyarrow_ops on a task where we join, group and aggregate two tables with ~300K rows. The Datafusion package is about 3-5 times slower than its alternatives.

What is causing this performance hit? Is it the serialization between C / Python or is it the performance of DataFusion itself?

import datafusion, time
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow_ops import head, join, groupby

f1 = "path to local parquet file"
f2 = "path to local parquet file"

# Pandas
time1 = time.time()
t1 = pq.read_table(f1, columns=['sku_key', 'option_key']).to_pandas()
t2 = pq.read_table(f2, columns=['sku_key', 'economical']).to_pandas()
r = t1.merge(t2, on=['sku_key']).groupby(['option_key']).agg({'economical': 'sum'})
print("Query in Pandas took:", time.time() - time1)
print(r.head())

# Pyarrow ops
time2 = time.time()
t1 = pq.read_table(f1, columns=['sku_key', 'option_key'])
t2 = pq.read_table(f2, columns=['sku_key', 'economical'])
r = groupby(join(t1, t2, on=['sku_key']), by=['option_key']).agg({'economical': 'sum'})
print("\nQuery in Pyarrow ops took:", time.time() - time2)
head(r)

# Datafusion
time3 = time.time()

f = datafusion.functions
ctx = datafusion.ExecutionContext()
ctx.register_parquet("skus", f1)
ctx.register_parquet("stock_current", f2)
result = ctx.sql("SELECT option_key, SUM(economical) as stock FROM stock_current as sc JOIN skus as sk USING (sku_key) GROUP BY option_key").collect()
r = pa.Table.from_batches(result)
print("\nQuery in DataFusion took:", time.time() - time3)
head(r)
Query in Pandas took: 0.6710879802703857
            economical
option_key
15847197             4
15978197           455
15984669            56
15985197           907
16066197           460

Query in Pyarrow ops took: 1.0179059505462646
Row  option_key  economical
0    15847197    4
1    15978197    455
2    15984669    56
3    15985197    907
4    16066197    460

Query in DataFusion took: 3.2192792892456055
Row  option_key  stock
0    26284326    0
1    25214207    -1
2    30372204    16
3    33163308    156
4    26880505    10
jorgecarleitao commented 3 years ago

Hi @TomScheffers , for some reason I did not get this notification!! :(

Thanks a lot for raising this. I agree with you that it is not acceptable; the issue is valid 👍

I tried this before and IMO it is DataFusion, specifically the groupBy operation. It has a performance issue: even spark is faster than DataFusion for the groupBy. If you try an aggregation or a filter you will see that the performance is relatively good.

We are tracking this on the DataFusion side, where the penalty is observed. I think that others are observing the same in DataFusion.

TomScheffers commented 3 years ago

@jorgecarleitao Thank you for your reply. Obviously, the best place to solve this issue is at its source, so good luck with that. I love the work you guys are doing on datafusion as an in-memory arrow interface.

For the python users among us: I am working on a datafusion like package in python (wombat_db) to interface with arrow in a dataframe like fashion. I would like some feedback on the project, so please check it out!

jorgecarleitao commented 3 years ago

fyi @andygrove @alamb, as IMO this is another good way of benchmarking. Note that datafusion-python is just building and running the query.

Note also that the result is wrong.