dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
390 stars 72 forks source link

[ENH] Support timestampdiff #411

Open randerzander opened 2 years ago

randerzander commented 2 years ago

Following the Calcite docs, I was confused about how to compute the difference between two timestamps. I found TIMESTAMPDIFF(timeUnit, datetime, datetime2), but that doesn't seem to work:

import pandas as pd
from dask_sql import Context

c = Context()

df = pd.DataFrame({'dt0': ['2022-03-01 12:00:00'], 'dt1': ['2022-03-01 13:00:00']})
df['dt0'] = df['dt0'].astype('datetime64[s]')
df['dt1'] = df['dt1'].astype('datetime64[s]')

c.create_table('test_dt', df)

c.sql("select timestampdiff(SECOND, dt1, dt0) from test_dt").compute()

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:818, in RexCallPlugin.convert(self, rex, dc, context)
    817 try:
--> 818     operation = self.OPERATION_MAPPING[operator_name]
    819 except KeyError:

KeyError: 'reinterpret'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:821, in RexCallPlugin.convert(self, rex, dc, context)
    820 try:
--> 821     operation = context.schema[schema_name].functions[operator_name]
    822 except KeyError:  # pragma: no cover

KeyError: 'reinterpret'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
Input In [101], in <module>
      5 df['dt1'] = df['dt1'].astype('datetime64[s]')
      7 c.create_table('test_dt', df)
----> 9 c.sql("select timestampdiff(SECOND, dt1, dt0) from test_dt").compute()

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/context.py:460, in Context.sql(self, sql, return_futures, dataframes, gpu)
    456         self.create_table(df_name, df, gpu=gpu)
    458 rel, select_names, _ = self._get_ral(sql)
--> 460 dc = RelConverter.convert(rel, context=self)
    462 if dc is None:
    463     return

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:60, in RelConverter.convert(cls, rel, context)
     54     raise NotImplementedError(
     55         f"No conversion for class {class_name} available (yet)."
     56     )
     57 logger.debug(
     58     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59 )
---> 60 df = plugin_instance.convert(rel, context=context)
     61 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rel/logical/project.py:55, in DaskProjectPlugin.convert(self, rel, context)
     53 else:
     54     random_name = new_temporary_column(df)
---> 55     new_columns[random_name] = RexConverter.convert(
     56         expr, dc, context=context
     57     )
     58     logger.debug(f"Adding a new column {key} out of {expr}")
     59     new_mappings[key] = random_name

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:66, in RexConverter.convert(cls, rex, dc, context)
     58     raise NotImplementedError(
     59         f"No conversion for class {class_name} available (yet)."
     60     )
     62 logger.debug(
     63     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     64 )
---> 66 df = plugin_instance.convert(rex, dc, context=context)
     67 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     68 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:809, in RexCallPlugin.convert(self, rex, dc, context)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
--> 809     operands = [
    810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:810, in <listcomp>(.0)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
    809     operands = [
--> 810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:66, in RexConverter.convert(cls, rex, dc, context)
     58     raise NotImplementedError(
     59         f"No conversion for class {class_name} available (yet)."
     60     )
     62 logger.debug(
     63     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     64 )
---> 66 df = plugin_instance.convert(rex, dc, context=context)
     67 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     68 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:809, in RexCallPlugin.convert(self, rex, dc, context)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
--> 809     operands = [
    810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:810, in <listcomp>(.0)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
    809     operands = [
--> 810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:66, in RexConverter.convert(cls, rex, dc, context)
     58     raise NotImplementedError(
     59         f"No conversion for class {class_name} available (yet)."
     60     )
     62 logger.debug(
     63     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     64 )
---> 66 df = plugin_instance.convert(rex, dc, context=context)
     67 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     68 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:823, in RexCallPlugin.convert(self, rex, dc, context)
    821         operation = context.schema[schema_name].functions[operator_name]
    822     except KeyError:  # pragma: no cover
--> 823         raise NotImplementedError(f"{operator_name} not (yet) implemented")
    825 logger.debug(
    826     f"Executing {operator_name} on {[str(LoggableDataFrame(df)) for df in operands]}"
    827 )
    829 kwargs = {}

NotImplementedError: reinterpret not (yet) implemented

The Calcite docs indicate timestampdiff is equivalent to (timestamp2 - timestamp1 timeUnit). That will parse and execute, but the timeUnit appears to be ignored. Below I get a delta in hours regardless of what timeUnit I pass:

c.sql("select (dt1-dt0) SECOND from test_dt").compute()

    ("test_dt"."dt1" - "test_dt"."dt0") SECOND
0   0 days 01:00:00

cc @ayushdg , @jdye64

charlesbluca commented 2 years ago

Is this request covered by https://github.com/dask-contrib/dask-sql/issues/242? If so looks like there was an attempt to pick this up in #293 but not sure if that's ready for review (cc @rajagurunath)