narwhals-dev / narwhals

Lightweight and extensible compatibility layer between dataframe libraries!
https://narwhals-dev.github.io/narwhals/
MIT License
607 stars 90 forks source link

[perf]: Dask implementation should use regular getitem over loc #1349

Closed phofl closed 1 week ago

phofl commented 1 week ago

Describe the bug

Loc isn't very well supported in Dask since all the special implementations that pandas has don't work in a distributed environment. I would generally recommend just using getitem if you want to either select columns or apply a filter. Currently, this blocks column projections for some reason and we don't have many tests for loc specific cases

Steps or code to reproduce the bug

run q1 for dask but call .optimize(fuse=False).pprint() instead of .compute()

Expected results

The ReadParquetFSSpec expression should be restricted to a subset of the columns

Actual results

SortValuesBlockwise: sort_function=<methodcaller: sort_values> sort_kwargs={'by': ['l_returnflag', 'l_linestatus'], 'ascending': True, 'na_position': 'first', 'ignore_index': False}
  ResetIndex:
    ColumnsSetter: columns=('sum_qty', 'sum_base_price', 'sum_disc_price', 'sum_charge', 'avg_qty', 'avg_price', 'avg_disc', 'count_order')
      Projection: columns=[('l_quantity', 'sum'), ('l_extendedprice', 'sum'), ('disc_price', 'sum'), ('charge', 'sum'), ('l_quantity', 'mean'), ('l_extendedprice', 'mean'), ('l_discount', 'mean'), ('l_returnflag', 'size')]
        DecomposableGroupbyAggregation(TreeReduce): split_every=8
          DecomposableGroupbyAggregation(GroupByChunk): funcs=[('count-l_discount-600ee3bd25d7c4d5c2a5ddb0de7ca968', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_discount', 'func': <methodcaller: count>}), ('count-l_extendedprice-2e03ca098804aa8d414ad2ca4371a45f', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_extendedprice', 'func': <methodcaller: count>}), ('count-l_quantity-cba6b095366123379c694b2440e68176', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_quantity', 'func': <methodcaller: count>}), ('size-l_returnflag-8b574197810b39f175d748dec390afb9', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_returnflag', 'func': <methodcaller: size>}), ('sum-charge-134b51212e1b19c4d40dfb4ad041f759', <function _apply_func_to_column at 0x10e085440>, {'column': 'charge', 'func': <methodcaller: sum>}), ('sum-disc_price-aa841df1c0667abb221bc0323032012d', <function _apply_func_to_column at 0x10e085440>, {'column': 'disc_price', 'func': <methodcaller: sum>}), ('sum-l_discount-5a1d744b08b5b676964f608e5c33ad45', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_discount', 'func': <methodcaller: sum>}), ('sum-l_extendedprice-3fe60ac654450a52e82c43c590eb25f5', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_extendedprice', 'func': <methodcaller: sum>}), ('sum-l_quantity-7305c5b69ff16f9d055728f74ef859b3', <function _apply_func_to_column at 0x10e085440>, {'column': 'l_quantity', 'func': <methodcaller: sum>})] observed=True dropna=False
            Projection: columns=['l_quantity', 'l_extendedprice', 'l_discount', 'l_returnflag', 'l_linestatus', 'disc_price', 'charge']
              Assign: disc_price=, charge=
                Projection: columns=['l_discount', 'l_extendedprice', 'l_linestatus', 'l_quantity', 'l_returnflag']
                  Loc:
                    ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                    RenameSeries: index='l_shipdate'
                      RenameSeries: index='l_shipdate'
                        LE: right=datetime.datetime(1998, 9, 2, 0, 0)
                          Projection: columns='l_shipdate'
                            ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                RenameSeries: index='l_extendedprice'
                  Mul:
                    Projection: columns='l_extendedprice'
                      Loc:
                        ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                        RenameSeries: index='l_shipdate'
                          RenameSeries: index='l_shipdate'
                            LE: right=datetime.datetime(1998, 9, 2, 0, 0)
                              Projection: columns='l_shipdate'
                                ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                    RenameSeries: index='l_discount'
                      Sub: left=1
                        Projection: columns='l_discount'
                          Loc:
                            ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                            RenameSeries: index='l_shipdate'
                              RenameSeries: index='l_shipdate'
                                LE: right=datetime.datetime(1998, 9, 2, 0, 0)
                                  Projection: columns='l_shipdate'
                                    ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                RenameSeries: index='l_extendedprice'
                  Mul:
                    RenameSeries: index='l_extendedprice'
                      Mul:
                        Projection: columns='l_extendedprice'
                          Loc:
                            ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                            RenameSeries: index='l_shipdate'
                              RenameSeries: index='l_shipdate'
                                LE: right=datetime.datetime(1998, 9, 2, 0, 0)
                                  Projection: columns='l_shipdate'
                                    ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                        RenameSeries: index='l_discount'
                          Sub: left=1.0
                            Projection: columns='l_discount'
                              Loc:
                                ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                                RenameSeries: index='l_shipdate'
                                  RenameSeries: index='l_shipdate'
                                    LE: right=datetime.datetime(1998, 9, 2, 0, 0)
                                      Projection: columns='l_shipdate'
                                        ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                    RenameSeries: index='l_tax'
                      Add: left=1.0
                        Projection: columns='l_tax'
                          Loc:
                            ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}
                            RenameSeries: index='l_shipdate'
                              RenameSeries: index='l_shipdate'
                                LE: right=datetime.datetime(1998, 9, 2, 0, 0)
                                  Projection: columns='l_shipdate'
                                    ReadParquetFSSpec: path='data/lineitem.parquet' kwargs={'dtype_backend': 'pyarrow'}

Please run narwhals.show_version() and enter the output below.

System:
    python: 3.12.7 | packaged by conda-forge | (main, Oct  4 2024, 15:57:01) [Clang 17.0.6 ]
executable: /Users/patrick/mambaforge/envs/narwhals/bin/python3.12
   machine: macOS-14.3.1-arm64-arm-64bit

Python dependencies:
     narwhals: 1.13.3
       pandas: 2.2.3
       polars: 1.12.0
         cudf:
        modin:
      pyarrow: 18.0.0
        numpy: 2.1.3

Relevant log output

No response

MarcoGorelli commented 1 week ago

thanks @phofl

I don't think we can do this for selecting multiple column names, because then if you have boolean column names then trying to select columns by name will result in filtering rows by a mask

But we can do it for:

phofl commented 1 week ago

Duplicated columns don't work in Dask fwiw, which makes the boolean case basically non-existent?

MarcoGorelli commented 1 week ago

there don't need to be duplicates, you could have two columns called [True, False], no?

phofl commented 1 week ago

We would remove that selection because it's equal to the number of columns which makes it a no-op

phofl commented 1 week ago

There is technically [True] and [False] though

MarcoGorelli commented 1 week ago

this is what i mean:

In [12]: def func(df, column_names):
    ...:     return df[column_names]
    ...:

In [13]: func(pd.DataFrame({True: [1,2], False: [4,5]}), [True, False])
Out[13]:
   True   False
0      1      4

In [14]: func(pd.DataFrame({'a': [1,2], 'b': [4,5]}), ['a', 'b'])
Out[14]:
   a  b
0  1  4
1  2  5
MarcoGorelli commented 1 week ago

having said that, the dtype of df.columns is always known (and inexpensive) in Dask, right?

So, could we just do:

if df.columns.dtype.kind != 'b':
    return df[column_names]
else:
    return df.loc[:, column_names]

?

(we could probably make such a utility and reuse it for pandas backend too)

phofl commented 1 week ago

Good point

Yeah that would work, boolean columns in Dask don't really work (tried to select something right now and it continues to raise 😅), so you can also not worry about boolean columns much

But yes, accessing the dtype of the columns is cheap