dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
397 stars 72 forks source link

timestampdiff support #242

Open ayushdg opened 3 years ago

ayushdg commented 3 years ago

the timestampdiff operation currently fails to do missing reinterpret implementation error.

Here's a reproducer:

from dask_sql import Context
from dask import dataframe as dd
import pandas as pd

c = Context()
df = pd.DataFrame({"dt":["2002-06-05 00:00:00", "2005-09-01 00:00:00", "2003-12-03 00:00:00"]})
df2 = pd.DataFrame({"dt2":["2002-06-07 00:00:00", "2002-06-05 00:00:00", "2002-06-05 00:00:00"]})
c.create_table("df", df)
c.create_table("df2", df2)

query = "SELECT timestampdiff(DAY, CAST(dt AS TIMESTAMP),  CAST(dt2 AS TIMESTAMP)) FROM df, df2"
c.sql(query)

Error with:

NotImplementedError                       Traceback (most recent call last)
/tmp/ipykernel_64215/3992071282.py in <module>
     10 
     11 query = "SELECT timestampdiff(DAY, CAST(dt AS TIMESTAMP),  CAST(dt2 AS TIMESTAMP)) FROM df, df2"
---> 12 c.sql(query)

lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    421         rel, select_names, _ = self._get_ral(sql)
    422 
--> 423         dc = RelConverter.convert(rel, context=self)
    424 
    425         if dc is None:

lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df

lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     51             else:
     52                 random_name = new_temporary_column(df)
---> 53                 new_columns[random_name] = RexConverter.convert(
     54                     expr, dc, context=context
     55                 )

lib/python3.8/site-packages/dask_sql/physical/rex/convert.py in convert(cls, rex, dc, context)
     60         )
     61 
---> 62         df = plugin_instance.convert(rex, dc, context=context)
     63         logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     64         return df

lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in convert(self, rex, dc, context)
    769     ) -> SeriesOrScalar:
    770         # Prepare the operands by turning the RexNodes into python expressions
--> 771         operands = [
    772             RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    773         ]

lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in <listcomp>(.0)
    770         # Prepare the operands by turning the RexNodes into python expressions
    771         operands = [
--> 772             RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    773         ]
    774 

lib/python3.8/site-packages/dask_sql/physical/rex/convert.py in convert(cls, rex, dc, context)
     60         )
     61 
---> 62         df = plugin_instance.convert(rex, dc, context=context)
     63         logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     64         return df

lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in convert(self, rex, dc, context)
    769     ) -> SeriesOrScalar:
    770         # Prepare the operands by turning the RexNodes into python expressions
--> 771         operands = [
    772             RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    773         ]

lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in <listcomp>(.0)
    770         # Prepare the operands by turning the RexNodes into python expressions
    771         operands = [
--> 772             RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    773         ]
    774 

lib/python3.8/site-packages/dask_sql/physical/rex/convert.py in convert(cls, rex, dc, context)
     60         )
     61 
---> 62         df = plugin_instance.convert(rex, dc, context=context)
     63         logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     64         return df

lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in convert(self, rex, dc, context)
    783                 operation = context.schema[schema_name].functions[operator_name]
    784             except KeyError:  # pragma: no cover
--> 785                 raise NotImplementedError(f"{operator_name} not (yet) implemented")
    786 
    787         logger.debug(

NotImplementedError: reinterpret not (yet) implemented
rajagurunath commented 3 years ago

Hi @ayushdg,

Thanks a bunch for trying this out and an easy example to reproduce the bug.

A quick explanation of the Query gives:

c.explain(query)

LogicalProject(EXPR$0=[CAST(/INT(Reinterpret(-($1, $0)), 86400000)):INTEGER])
  LogicalJoin(condition=[true], joinType=[inner])
    LogicalProject(CAST=[CAST($0):TIMESTAMP(0)])
      LogicalTableScan(table=[[root, df]])
    LogicalProject(CAST=[CAST($0):TIMESTAMP(0)])
      LogicalTableScan(table=[[root, df2]])

Currently, As you mentioned, dask-sql missing Reinterpret operator (which is somewhat similar to the cast Operation mentioned here)

For the testing purpose, I just tried using CastOperation as a replacement for Reinterpret. And /INT (IntDivisionOperator) operator was also missing, which was implemented in this PR by @nils-braun, that was yet to be merged. So Using these two Operator implementations, the timestampdiff Operation was working fine for me.

I will try to understand more about the Reinterpret Operator and will try to raise a PR for this!

Cheers

jdye64 commented 3 years ago

I can take a look at this one

rajagurunath commented 3 years ago

Hi @jdye64

Thanks a lot for looking into this issue,

I am working on this timestampdiff implementation for the past few days, but have not been able to achieve the results completely. So looking for another pair of hands to help me sort this issue 😁

Please have a look, at some of the work done so far: https://github.com/dask-contrib/dask-sql/compare/main...rajagurunath:feature/timestampdiff

The problem:

The current implementation is able to achieve the timestampdiff for units like seconds, minute, hour, day but fails for the month, Quarter and year.

Query plan for the SQL (micro/seconds,min,hour,day ):- ✅

LogicalProject(

ms=[*(CAST(/INT(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0))), 1000)):INTEGER, 1000000)],
 sec=[CAST(/INT(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0))), 1000)):INTEGER],
 minn=[CAST(/INT(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0))), 60000)):INTEGER],
 hr=[CAST(/INT(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0))), 3600000)):INTEGER],
 dayy=[CAST(/INT(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0))), 86400000)):INTEGER]
)
LogicalTableScan(table=[[root, test]])

Query Plan for the SQL (month, year):❌

LogicalProject(
    monthh=[CAST(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0)))):INTEGER], 
    yearr=[CAST(/INT(Reinterpret(-(CAST($0):TIMESTAMP(0), CAST($1):TIMESTAMP(0))), 12)):INTEGER]
)

  LogicalTableScan(table=[[root, test]])

As per the plan, we are casting the two expression, ---> subtract -- > Reinterpret -- > divby the number given by calcite(60000,1000 etc)-->cast into int32

Since for the year and month, the dividend number was very low, Dask fails by not being able to cast the big floating-point number into int32.

I think I am missing something obvious while implementing /int and reinterpret, I think we need to make use of some methods from the java side as well to implement this functionality smoothly. (To get the UNITS - DAY, MINUTE, HOUR, from java side to python side? 🤔 ). are there some other methods available from the python Dask side to implement this feature?

Please let me know if you want me to raise a WIP PR for this feature to discuss further.

Thanks in Advance

cheers!

jdye64 commented 3 years ago

@rajagurunath I think it would be good if you could open a WIP PR. That would allow us to discuss a little more and also take advantage of CI for testing theories and changes.

rajagurunath commented 3 years ago

Thanks a lot @jdye64, have created a PR, Please review and let me know your Suggestions/Feedback

ayushdg commented 2 years ago

I have been looking into this a bit more and updating #293 locally to work with the latest branch. On further inspection it looks like the main issue with month and year comes from calcite assuming a different time_interval for the date subtraction vs the other time units. Currently we handle the SqlDatetimeSubtractionOperator the same as any regular subtraction operation but I think we might need a way to extract the time units for this operation as well from calcite based on the docs here.

I'll look into this further to see how to get that from calcite.