dask / dask-expr

BSD 3-Clause "New" or "Revised" License
86 stars 27 forks source link

Something's off with Parquet optimization and computing #131

Open mrocklin opened 1 year ago

mrocklin commented 1 year ago

This is one of my standard examples, and it's quite sad today

import dask.dataframe as dd
import dask_expr as dx

df = dx.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
)
df["tipped"] = df.tips != 0
df.groupby(df.hvfhs_license_num).tipped.mean()

It looks like this second cell triggers computation when it should be instantaneous. I suspect the parquet lengths stuff. cc @rjzamora

rjzamora commented 1 year ago

Interesting - Should definitely not trigger computation. I'll look into it.

rjzamora commented 1 year ago

Quick note that the problem is unrelated to ReadParquet. We don't support grouping on Series yet, so df.groupby(df.hvfhs_license_num) is technically not allowed. Trying to figure out why we end up hanging instead of raising an error here.

mrocklin commented 1 year ago

If you watch the dashboard there are definitely read_parquet tasks running

On Thu, Jun 1, 2023 at 3:00 PM Richard (Rick) Zamora < @.***> wrote:

Quick note that the problem is unrelated to ReadParquet. We don't support grouping on Series yet, so df.groupby(df.hvfhs_license_num) is technically not allowed. Trying to figure out why we end up hanging instead of raising an error here.

— Reply to this email directly, view it on GitHub https://github.com/mrocklin/dask-expr/issues/131#issuecomment-1572112787, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBH3ZWIU5KVDG2ZRJLXJCOAJANCNFSM6AAAAAAYW6ICW4 . You are receiving this because you authored the thread.Message ID: @.***>

rjzamora commented 1 year ago

If you watch the dashboard there are definitely read_parquet tasks running

Sure, but I think that's more about Groupby doing something unexpected. None of that happens if you just do:

df["tipped"] = df.tips != 0
df.groupby('hvfhs_license_num').tipped.mean()
mrocklin commented 1 year ago

Ah, I see. Well then, there's this:

%%prun
df["tipped"] = df.tips != 0

df.groupby("hvfhs_license_num").tipped.mean()

This takes two seconds

         212142 function calls (209178 primitive calls) in 4.493 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      220    2.150    0.010    2.150    0.010 {method 'acquire' of '_thread.lock' objects}
        8    1.076    0.135    4.266    0.533 arrow.py:958(_collect_dataset_info)
        8    1.004    0.126    1.008    0.126 {method 'finish' of 'pyarrow._dataset.DatasetFactory' objects}
53237/52782    0.010    0.000    0.017    0.000 {built-in method builtins.isinstance}
        8    0.008    0.001    0.011    0.001 {method 'empty_table' of 'pyarrow.lib.Schema' objects}
     5760    0.006    0.000    0.007    0.000 {method 'normalize_path' of 'pyarrow._fs.FileSystem' objects}
      306    0.005    0.000    0.005    0.000 {method 'reduce' of 'numpy.ufunc' objects}
      616    0.004    0.000    0.005    0.000 base.py:229(construct_from_string)
rjzamora commented 1 year ago

Right, I see. I suppose it's probably time to start caching the dataset_info result outside the ReadParquet object itself. We currently need to extract basic metadata information every time a new ReadParquet expression is created.

I just did a quick experiment to confirm that this change takes that cell down to ~20ms. However, there are other groupby-related issues that produce bad results (although the .agg equivalent seems to be fine...).

rjzamora commented 1 year ago

Note that https://github.com/mrocklin/dask-expr/pull/132 provides one possible solution to the caching problem. Will need to address the groupby issue separately.

rjzamora commented 1 year ago

I think the ReadParquet pain reported here is now resolved. However, I suppose we may want to consider allowing more than one object to be cached in the case that the objects originate from ReadParqet expressions reading from different paths. For example, #132 will be less effective if we are reading two different parquet datasets and joining them.

mrocklin commented 1 year ago

I'm running this:

import dask.dataframe as dd
import dask_expr as dx

df = dx.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
)
df = df[df.base_passenger_fare > 0]
result = df.base_passenger_fare.mean()
result.compute()

And it's stuck in a loop calling gather-pq-parts over and over again. I'm not sure why it would need to call this function at all.

rjzamora commented 1 year ago

Oh no! I’m OOO, but I’ll try to look into this soon.

mrocklin commented 1 year ago

Turning off predicate pushdown solves the issue:

diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py
index e9c4370..7b1e5bf 100644
--- a/dask_expr/io/parquet.py
+++ b/dask_expr/io/parquet.py
@@ -115,6 +115,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO):
         if isinstance(parent, Filter) and isinstance(
             parent.predicate, (LE, GE, LT, GT, EQ, NE, And, Or)
         ):
+            return
             # Predicate pushdown
             filters = _DNF.extract_pq_filters(self, parent.predicate)
             if filters:
rjzamora commented 1 year ago

And it's stuck in a loop calling gather-pq-parts over and over again. I'm not sure why it would need to call this function at all.

I think the column projection and predicate pushdown are just producing several different variations of the ReadParquet expression. For every new variation (different columns and/or filters), we are calling npartitions somewhere, which is triggering a fresh metadata-collection call (since npartitions is calculated from _divisions).

About to look into this further...