dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
390 stars 72 forks source link

[BUG] rank() function doesn't work in dsql #272

Open DaceT opened 3 years ago

DaceT commented 3 years ago

What happened:

What you expected to happen:

Minimal Complete Verifiable Example:

import cudf
from dask_sql import Context
dc = Context()

df = cudf.DataFrame({'Animal': ['cat', 'penguin', 'dog','spider'],
                        'Number_legs': [4, 2, 4, 8]})
dc.create_table('my_table', df)
dc.sql("select Animal, Number_legs, (rank() OVER (PARTITION BY Number_legs ORDER BY Animal)) as default_rank from my_table").compute()

Anything else we need to know?:

KeyError: 'rank'

During handling of the above exception, another exception occurred:

AttributeError Traceback (most recent call last) /tmp/ipykernel_45366/1186692474.py in 7 'Number_legs': [4, 2, 4, 8]}) 8 dc.create_table('my_table', df) ----> 9 dc.sql("select Animal, Number_legs, (rank() OVER (PARTITION BY Number_legs ORDER BY Animal)) as default_rank from my_table").compute()

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes) 420 rel, selectnames, = self._get_ral(sql) 421 --> 422 dc = RelConverter.convert(rel, context=self) 423 424 if dc is None:

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context) 54 f"Processing REL {rel} using {plugin_instance.class.name}..." 55 ) ---> 56 df = plugin_instance.convert(rel, context=context) 57 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}") 58 return df

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in convert(self, rel, context) 249 250 for window in rel.groups: --> 251 dc = self._apply_window( 252 window, constants, constant_count_offset, dc, field_names, context 253 )

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _apply_window(self, window, constants, constant_count_offset, dc, field_names, context) 292 temporary_columns += group_columns 293 --> 294 operations, df = self._extractoperations(window, df, dc, context) 295 for , _, cols in operations: 296 temporary_columns += cols

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _extract_operations(self, window, df, dc, context) 394 except KeyError: # pragma: no cover 395 try: --> 396 operation = context.functions[operator_name] 397 except KeyError: # pragma: no cover 398 raise NotImplementedError(f"{operator_name} not (yet) implemented")

AttributeError: 'Context' object has no attribute 'functions'



**Environment**:

dask-sql version: **0.3.10.dev30+ge811e54**
Python version: **3.8**
Operating System: **Ubuntu 18.04**
Install method (conda, pip, source): **conda**
ayushdg commented 3 years ago

Thanks for raising the issue @DaceT! I believe there's a couple of things going on here:

  1. Dask-sql does not support window rank functions today. Here's a list of initial aggregations supported with window operations: https://github.com/dask-contrib/dask-sql/issues/43#issuecomment-840665868
  2. The error message is a bit confusing because it should ideally give a NotImplementedError instead of a KeyError. The reason for that is a bug which needs to be fixed here: https://github.com/dask-contrib/dask-sql/blob/a0ea1d96dad6658cb2d6c0103d32a61db85278cc/dask_sql/physical/rel/logical/window.py#L394-L400

The check on line 398 should look at context.schema[schema_name].functions rather than context.functions similar to how it's done in other parts of the codebase like this one: https://github.com/dask-contrib/dask-sql/blob/a0ea1d96dad6658cb2d6c0103d32a61db85278cc/dask_sql/physical/rex/core/call.py#L787

rajagurunath commented 3 years ago

Thanks, @DaceT, and @ayushdg, This is really great catch.

The check on line 398 should look at context.schema[schema_name].functions rather than context.functions similar to how it's done in other parts of the codebase like this one:

I am responsible for this bug of not changing from context.functions to context.schema[schema_name].functions missed updating window functions while working on this multiple schema PR https://github.com/dask-contrib/dask-sql/pull/205.

let me fix this !

rajagurunath commented 3 years ago

Hi Everyone,

I am playing with the ranking window function for the past few days, Need some help/suggestions to improve the feature.

Background:

  1. Used pandas dataframe's rank API. here
  2. OverOperation classes used in dask-sql gets expanding window as the argument which doesn't support rank function yet which will be supported from upcoming pandas version 1.4.0

Approach/workaround Taken:

Consider there are two cases for this problem:

Single column in order-by clause :

 SELECT *,  RANK() OVER (ORDER BY a ASC NULLS LAST) AS a4 FROM a
# with partition:
 SELECT *,  RANK() OVER (PARTITION BY a ORDER BY a ASC NULLS LAST) AS a4 FROM a

In the above query, there is only one column a in order-by clause, implementing the above case is straightforward because there is a one-to-one mapping between the SQL and pandas rank methods.

df.rank(method='min',na_option="bottom",ascending=True) 
# same applies for partition rank
df.groupby("a").rank(method='min',na_option="bottom",ascending=True) 

Multiple columns in order-by clause :

SELECT *, 
RANK() OVER (ORDER BY a NULLS FIRST, b  NULLS FIRST, e) AS a3
FROM a
ORDER BY a NULLS FIRST, b NULLS FIRST, c NULLS FIRST, d NULLS FIRST, e

To handle multiple columns in the order by clause, we are changing the type of the columns into a string datatype and combining multiple columns into single tuple columns (tuples of string).

res = (
          df.obj[sort_cols]
          .astype(str)
          .apply(tuple, axis=1)
          .rank(method="min", na_option=na_option, ascending=sort_ascending)
          .astype(int)
        )

so na_option and sort_ascending can be specified for only for the combined columns, not sure how to handle multiple column's [a ASC NULLS LAST, b DESC NULLS FIRST ] tried sorting the data frame first (and used sort_ascending and na_position parameters of pd.DataFrame.sort_values) and then applied the rank function but doesn't seem to be matching the SQLite results.

Please do let me know if there is any feedback/suggestions to handle these multiple columns case 🤔 , is something I am missing here ? or any other better approach?

The Feature branch I am working on currently was here

Apologies for the long post,

Thanks in Advance

paarivarik-engineer commented 2 years ago

@rajagurunath is there any update on the implementation of rank/rownum support in dask-sql?