dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
397 stars 72 forks source link

[ENH] Support for to_timestamp #831

Closed beckernick closed 1 year ago

beckernick commented 2 years ago

I'd like to be able to convert data representing time since UNIX epoch to explicit timestamps format with to_timestamp, like I can in Spark SQL and PosgreSQL.

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder \
    .master("local") \
    .getOrCreate()

df = pd.DataFrame({
    "a": [1406073600, 1406073600, 1406073600],
})

sdf = spark.createDataFrame(df)
sdf.createOrReplaceTempView("df")

query = """
SELECT to_timestamp(a) as date
FROM df
"""
spark.sql(query).show()
+-------------------+
|               date|
+-------------------+
|2014-07-22 20:00:00|
|2014-07-22 20:00:00|
|2014-07-22 20:00:00|
+-------------------+
import pandas as pd
from dask_sql import Context
import dask

c = Context()

c.create_table("df", df)

query = """
SELECT to_timestamp(a) as date
FROM df
"""
print(c.sql(query).compute())
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:1000, in RexCallPlugin.convert(self, rel, expr, dc, context)
    999 try:
-> 1000     operation = self.OPERATION_MAPPING[operator_name]
   1001 except KeyError:

KeyError: 'totimestamp'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:1003, in RexCallPlugin.convert(self, rel, expr, dc, context)
   1002 try:
-> 1003     operation = context.schema[schema_name].functions[operator_name]
   1004 except KeyError:  # pragma: no cover

KeyError: 'totimestamp'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
Input In [5], in <cell line: 13>()
      7 c.create_table("df", df)
      9 query = """
     10 SELECT to_timestamp(a) as date
     11 FROM df
     12 """
---> 13 print(c.sql(query).compute())

File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/context.py:501, in Context.sql(self, sql, return_futures, dataframes, gpu, config_options)
    496 else:
    497     raise RuntimeError(
    498         f"Encountered unsupported `LogicalPlan` sql type: {type(sql)}"
    499     )
--> 501 return self._compute_table_from_rel(rel, return_futures)

File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/context.py:830, in Context._compute_table_from_rel(self, rel, return_futures)
    829 def _compute_table_from_rel(self, rel: "LogicalPlan", return_futures: bool = True):
--> 830     dc = RelConverter.convert(rel, context=self)
    832     # Optimization might remove some alias projects. Make sure to keep them here.
    833     select_names = [field for field in rel.getRowType().getFieldList()]

File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
     55     raise NotImplementedError(
     56         f"No relational conversion for node type {node_type} available (yet)."
     57     )
     58 logger.debug(
     59     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     60 )
---> 61 df = plugin_instance.convert(rel, context=context)
     62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     63 return df

File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/physical/rel/logical/project.py:57, in DaskProjectPlugin.convert(self, rel, context)
     55 else:
     56     random_name = new_temporary_column(df)
---> 57     new_columns[random_name] = RexConverter.convert(
     58         rel, expr, dc, context=context
     59     )
     60     logger.debug(f"Adding a new column {key} out of {expr}")
     61     new_mappings[key] = random_name

File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:73, in RexConverter.convert(cls, rel, rex, dc, context)
     65     raise NotImplementedError(
     66         f"No conversion for class {expr_type} available (yet)."
     67     )
     69 logger.debug(
     70     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     71 )
---> 73 df = plugin_instance.convert(rel, rex, dc, context=context)
     74 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     75 return df

File ~/miniconda3/envs/rapids-22.10-dasksql/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:1005, in RexCallPlugin.convert(self, rel, expr, dc, context)
   1003         operation = context.schema[schema_name].functions[operator_name]
   1004     except KeyError:  # pragma: no cover
-> 1005         raise NotImplementedError(f"{operator_name} not (yet) implemented")
   1007 logger.debug(
   1008     f"Executing {operator_name} on {[str(LoggableDataFrame(df)) for df in operands]}"
   1009 )
   1011 kwargs = {}

NotImplementedError: totimestamp not (yet) implemented
randerzander commented 2 years ago

@sarahyurick are you able to pick this up?