dask / dask-expr

BSD 3-Clause "New" or "Revised" License
86 stars 25 forks source link

We tend to read data multiple times from storage #218

Closed phofl closed 1 year ago

phofl commented 1 year ago

I exchange a couple of messages with Rick on this topic yesterday.

Our optimization logic has a weakness when it comes to reading data (the issue extends to other methods as well, but let's focus on I/O to keep it a bit simpler). We tend to read data multiple times when we have different branches in our tree from the same root. One example:

df = read_parquet("taxi.parquet")
df["tipped"] = df.tips != 0
df["bpf_valid"] = df.base_passenger_fare != 0

q = df[["tipped", "bpf_valid", "tips"]].sum()

tips is read twice in this case. Once for the NE expression that is used for assignment and once more for the sum aggregation at the end.

This is a problem that get's worse when the query gets more complicated. It's triggered if one operation operates on 2 DataFrames from the same root.

Ideally, we would only read the data once.

read_parquet(path, columns=["base_passenger_fare", "tips"], ...)["tips"]

There might be reasons not to do this, e.g. memory pressure, but this is probably only an option when we are not reading from remote storage. Our current optimization logic does not provide a mechanism to address this.

This isn't first-order priority but we should address this soonish.

This extends to multiple operations as well:

df = read_parquet("taxi.parquet")
df = df.replace(1, 100).fillna(100)
df["tipped"] = df.tips != 0
df["bpf_valid"] = df.base_passenger_fare != 0

q = df[["tipped", "bpf_valid", "tips"]].sum()

We will apply replace and fillna to tips twice as well.

cc @rjzamora

mrocklin commented 1 year ago

Maybe we can handle this in a final global optimization pass (like blockwise fusion) that happens after simplify/lower. We would look at all leaves of the graph, identify leaves that can be merged, and then replace those nodes by projections on some common read.

rjzamora commented 1 year ago

Maybe we can handle this in a final global optimization pass (like blockwise fusion) that happens after simplify/lower.

Right, this is exactly what I had in mind.

mrocklin commented 1 year ago

I'm hopeful that that isn't too hard

phofl commented 1 year ago

Same here, follow up operations could get a little tricky. Sometimes we remove whole operations if they are referencing columns that are dropped later on. That might make things a little tricky

mrocklin commented 1 year ago

I think that after we do this pass there's no more simplification/lowering. We've got the final read_parquet calls the rest of the tree will want. We produce these with a single read_parquet call followed by some projections. I think that everything should be smooth past then. Anything I'm missing?

phofl commented 1 year ago

The following is something that I had in mind:

df = read_parquet(...)
df = df.replace(..).fillna(...).astype(...)
df["new"] = df.old != 0
df[["new", "old"]].sum()

We will still end up executing replace, fillna and astype twice on column "old" if we only fuse the read_parquet calls. This isn't as important as I/O right now, but this has the potential to explode if you have longer queries and if you do more of these operations (setitem, concat, merge, ...). The worst case is that you'll end up executing most of your query n times, which makes it worse than the status quo in dask.dataframe.

Edit: The example wasn't ideal to illustrate my point. This gets exponentially worse if we add an operation that blocks projections, e.g.

df = read_parquet(...)
df = df.replace(..).fillna(...).astype(...).dropna()
df["new"] = df.old != 0
...
df[["new", "old"]].sum()

We end up doing df.replace(..).fillna(...).astype(...).dropna() on the whole DataFrame times the number of setitem calls for example.

rjzamora commented 1 year ago

I agree with @phofl that this will be tricky to solve "completely, but I'm hopeful that the initial step of tackling IO will be be both tractable and valuable.

Regarding the "trickiness", I'm currently looking at a variation on one of your examples:

import dask_expr as dx
import pandas as pd

path = "simple.parquet"
pdf = pd.DataFrame({"x": [0, 1, 2, 3] * 4, "y": range(16), "z": [None, 1, 2, 3] * 4})
dx.from_pandas(pdf, 2).to_parquet(path)

df = dx.read_parquet(path)
df = df.replace(1, 100).fillna(100)
df["xx"] = df.x != 0
df["yy"] = df.y != 0
q = df[["xx", "yy", "x"]].sum()

q.optimize(fuse=False).pprint()

This workflow produces

Sum:
  Projection: columns=['xx', 'yy', 'x']
    Assign: key='yy'
      Assign: key='xx'
        Fillna: value=100
          Replace: to_replace=1 value=100
            ReadParquet: path='simple.parquet' columns=['x'] kwargs={'dtype_backend': None}
        NE: right=0
          Fillna: value=100
            Replace: to_replace=1 value=100
              ReadParquet: path='simple.parquet' columns=['x'] kwargs={'dtype_backend': None} _series=True
      NE: right=0
        Fillna: value=100
          Replace: to_replace=1 value=100
            ReadParquet: path='simple.parquet' columns=['y'] kwargs={'dtype_backend': None} _series=True

which can ultimately get fused into the same task:


Sum:
  Fused(8e8c6):
  | Projection: columns=['xx', 'yy', 'x']
  |   Assign: key='yy'
  |     NE: right=0
  |       Fillna: value=100
  |         Replace: to_replace=1 value=100
  |           ReadParquet: path='simple.parquet' columns=['y'] kwargs={'dtype_backend': None} _series=True
  |     Assign: key='xx'
  |       NE: right=0
  |         Fillna: value=100
  |           Replace: to_replace=1 value=100
  |             ReadParquet: path='simple.parquet' columns=['x'] kwargs={'dtype_backend': None} _series=True
  |       Fillna: value=100
  |         Replace: to_replace=1 value=100
  |           ReadParquet: path='simple.parquet' columns=['x'] kwargs={'dtype_backend': None}

However, it is clear that the slightly-different ReadParquet operands will result in us reading the same data and calling replace/fillna multiple times within that fused task.

It wouldn't be difficult to find/replace overlapping ReadParquet operations so that the logic looks something like this:

Sum:
  Projection: columns=['xx', 'yy', 'x']
    Assign: key='yy'
      Assign: key='xx'
        Fillna: value=100
          Replace: to_replace=1 value=100
            Projection: ['x']
              ReadParquet: path='simple.parquet' columns=['x', 'y'] kwargs={'dtype_backend': None}
        NE: right=0
          Fillna: value=100
            Replace: to_replace=1 value=100
              Projection: 'x'
                ReadParquet: path='simple.parquet' columns=['x', 'y'] kwargs={'dtype_backend': None}
      NE: right=0
        Fillna: value=100
          Replace: to_replace=1 value=100
            Projection: 'y'
              ReadParquet: path='simple.parquet' columns=['x', 'y'] kwargs={'dtype_backend': None}

In this case, we would avoid redundant IO operations (all IO tasks will have the same name and therefore correspond to the same task name), but we would still end up repeating the Fillna(Replace(df['x'])) operation, because one of the operations projects to a Series and the other to a single-column DataFrame.

After tacking IO, I suppose we could try to avoid the repeated Fillna(Replace(df['x'])) operation by "pushing up" DataFrame-to-Series conversion. I'm not sure if the effort will be worth the benefit (I haven't thought through the logic), but this idea could give us something like:

Sum:
  Projection: columns=['xx', 'yy', 'x']
    Assign: key='yy'
      Assign: key='xx'
        Fillna: value=100
          Replace: to_replace=1 value=100
            Projection: ['x']
              ReadParquet: path='simple.parquet' columns=['x', 'y'] kwargs={'dtype_backend': None}
        NE: right=0
          Projection: 'x'
            Fillna: value=100
              Replace: to_replace=1 value=100
                Projection: ['x']
                  ReadParquet: path='simple.parquet' columns=['x', 'y'] kwargs={'dtype_backend': None}
      NE: right=0
        Fillna: value=100
          Replace: to_replace=1 value=100
            Projection: 'y'
              ReadParquet: path='simple.parquet' columns=['x', 'y'] kwargs={'dtype_backend': None}

(note that we will now produce the same task for both Fillna(Replace(df['x'])) operations.

rjzamora commented 1 year ago

We end up doing df.replace(..).fillna(...).astype(...).dropna() on the whole DataFrame times the number of setitem calls for example.

I have feeling that things might not be as bad as you are thinking in this case. You may be running a slightly different example, but I'm seeing that the repeated NE(Projection(Fillna(Replace(ReadParquet)))) pattern all turns into the same task at graph-construction time.

phofl commented 1 year ago

Hm ok this is weird. Running the following example:

df = read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
)
df = df.replace(1, 100).select_dtypes(include=[np.number]).fillna(100)
df["tipped"] = df.tips != 0
df["dp"] = df.driver_pay != 0
q = df[["tipped", "dp", "tips", "driver_pay"]].sum()

triples the number of replace and fillna tasks compared to a query that removes the setitem statements that are shown in the progress dashboard.

When running this locally on one of these files creates a dask graph that has 3 entries for each. Am I missing something here?

I agree that we should address the I/O stuff first though!

rjzamora commented 1 year ago

Yeah, that is a good example where the _projection_passthrough trick (which is set to True for Replace) ends up "multiplying" the Replace operation. In other words, each projection ends up "simplifying-up" it's Replace parent to only include the column it "needs".

In this case, we can always rewrite the IO operations so that columns includes the union of the various projections, but we would also need logic to "push back up" the corresponding projections through the Replace operations in the case that the various IO instances have the same Replace parent.

This does feel a bit tricky, because it requires use to inspect the entire graph (rather than a local parent-child environment. It would be nice if there is a clean way to just avoid this projection-multiplication problem in simplify, but that also seems tricky.

EDIT: One way to mitigate this problem a bit may be to "aggregate" the _simplify_up logic that is triggered by _projection_passthrough == True. For example, if the Replace child has a way to find out about other parents in that step, it could take the union of multiple projections.

rjzamora commented 1 year ago

Closing this as resolved by https://github.com/dask-contrib/dask-expr/pull/220, but we can re-open if needed.