dask-contrib / dask-deltatable

A Delta Lake reader for Dask
BSD 3-Clause "New" or "Revised" License
44 stars 14 forks source link

`read_deltalake` vs `read_parquet` performance #58

Open j-bennet opened 1 year ago

j-bennet commented 1 year ago

I did a quick test reading DeltaLake data in a notebook on a Coiled cluster from s3, with dd.read_parquet vs ddt.read_deltalake.

Cluster: https://cloud.coiled.io/clusters/245026/information?account=dask-engineering.

Data is located in s3://coiled-datasets/delta/.

Results:

dataset computation timing (read_parquet) timing (read_deltalake)
ds20f_100M ddf["int1"].sum().compute() CPU times: user 43.5 ms, sys: 10.7 ms, total: 54.2 ms, Wall time: 8.04 s CPU times: user 159 ms, sys: 38.4 ms, total: 198 ms, Wall time: 55.3 s
ds20f_100M ddf.describe().compute() CPU times: user 256 ms, sys: 28.7 ms, total: 284 ms, Wall time: 20.7 s CPU times: user 380 ms, sys: 60.7 ms, total: 441 ms, Wall time: 1min 10s
ds25f_250M ddf["int1"].sum().compute() CPU times: user 67.1 ms, sys: 15.6 ms, total: 82.7 ms, Wall time: 16.7 s CPU times: user 666 ms, sys: 176 ms, total: 842 ms, Wall time: 3min 59s
ds25f_250M ddf.describe().compute() CPU times: user 605 ms, sys: 70.3 ms, total: 675 ms, Wall time: 1min 10s CPU times: user 1.02 s, sys: 181 ms, total: 1.2 s, Wall time: 4min 2s
ds50f_500M ddf["int1"].sum().compute() CPU times: user 204 ms, sys: 49.2 ms, total: 253 ms, Wall time: 1min 2s CPU times: user 2.93 s, sys: 626 ms, total: 3.56 s, Wall time: 16min 46s
ds50f_500M ddf.describe().compute() CPU times: user 3.59 s, sys: 383 ms, total: 3.97 s, Wall time: 5min 53s killed before finished

This doesn't look good, and needs looking into.

j-bennet commented 1 year ago

Second attempt, after merging https://github.com/dask-contrib/dask-deltatable/pull/57.

Cluster: https://cloud.coiled.io/clusters/245853?account=dask-engineering

dataset computation timing (read_parquet) timing (read_deltalake)
ds20f_100M ddf["int1"].sum().compute() CPU times: user 41.2 ms, sys: 10.6 ms, total: 51.8 ms, Wall time: 9.48 s CPU times: user 181 ms, sys: 42.3 ms, total: 224 ms, Wall time: 59.8 s
ds20f_100M ddf.describe().compute() CPU times: user 243 ms, sys: 27.9 ms, total: 271 ms, Wall time: 23 s CPU times: user 308 ms, sys: 51.7 ms, total: 360 ms, Wall time: 1min 1s
ds25f_250M ddf["int1"].sum().compute() CPU times: user 63.8 ms, sys: 15.9 ms, total: 79.7 ms, Wall time: 16.6 s CPU times: user 716 ms, sys: 182 ms, total: 897 ms, Wall time: 3min 51s
ds25f_250M ddf.describe().compute() CPU times: user 623 ms, sys: 71 ms, total: 694 ms, Wall time: 1min 9s CPU times: user 986 ms, sys: 189 ms, total: 1.17 s, Wall time: 3min 52s
ds50f_500M ddf["int1"].sum().compute() CPU times: user 199 ms, sys: 47.5 ms, total: 246 ms, Wall time: 1min CPU times: user 2.89 s, sys: 799 ms, total: 3.69 s, Wall time: 16min 7s
ds50f_500M ddf.describe().compute() CPU times: user 3.45 s, sys: 383 ms, total: 3.83 s, Wall time: 5min 37s CPU times: user 5.36 s, sys: 832 ms, total: 6.19 s, Wall time: 16min 24s

Looks like dask-deltalake is doing something very inefficient.