dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
397 stars 72 forks source link

[DF] Some grouped aggregations fail #684

Closed randerzander closed 2 years ago

randerzander commented 2 years ago

Repro:

import pandas as pd
from dask_sql import Context

c = Context()

df = pd.DataFrame({"id": [0, 1, 1, 2], "val": [1, 1, 2, 1]})

c.create_table("df", df)
c.sql("""
SELECT
  val,
  COUNT(distinct(id))
FROM df
GROUP BY val
""")

Trace:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Input In [39], in <cell line: 3>()
      1 df = pd.DataFrame({"id": [0, 1, 1, 2], "val": [1, 1, 2, 1]})
      2 c.create_table("df", df)
----> 3 c.sql("""
      4 SELECT
      5   val,
      6   COUNT(distinct(id))
      7 FROM df
      8 GROUP BY val
      9 """)

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/context.py:495, in Context.sql(self, sql, return_futures, dataframes, gpu, config_options)
    491         self.create_table(df_name, df, gpu=gpu)
    493 rel, _ = self._get_ral(sql)
--> 495 return self._compute_table_from_rel(rel, return_futures)

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/context.py:819, in Context._compute_table_from_rel(self, rel, return_futures)
    818 def _compute_table_from_rel(self, rel: "LogicalPlan", return_futures: bool = True):
--> 819     dc = RelConverter.convert(rel, context=self)
    821     # Optimization might remove some alias projects. Make sure to keep them here.
    822     select_names = [field for field in rel.getRowType().getFieldList()]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
     55     raise NotImplementedError(
     56         f"No relational conversion for node type {node_type} available (yet)."
     57     )
     58 logger.debug(
     59     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     60 )
---> 61 df = plugin_instance.convert(rel, context=context)
     62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     63 return df

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/logical/project.py:28, in DaskProjectPlugin.convert(self, rel, context)
     26 def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
     27     # Get the input of the previous step
---> 28     (dc,) = self.assert_inputs(rel, 1, context)
     30     df = dc.df
     31     cc = dc.column_container

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/base.py:84, in BaseRelPlugin.assert_inputs(rel, n, context)
     81 # Late import to remove cycling dependency
     82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/base.py:84, in <listcomp>(.0)
     81 # Late import to remove cycling dependency
     82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
     55     raise NotImplementedError(
     56         f"No relational conversion for node type {node_type} available (yet)."
     57     )
     58 logger.debug(
     59     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     60 )
---> 61 df = plugin_instance.convert(rel, context=context)
     62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     63 return df

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/logical/project.py:28, in DaskProjectPlugin.convert(self, rel, context)
     26 def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
     27     # Get the input of the previous step
---> 28     (dc,) = self.assert_inputs(rel, 1, context)
     30     df = dc.df
     31     cc = dc.column_container

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/base.py:84, in BaseRelPlugin.assert_inputs(rel, n, context)
     81 # Late import to remove cycling dependency
     82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/base.py:84, in <listcomp>(.0)
     81 # Late import to remove cycling dependency
     82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
     55     raise NotImplementedError(
     56         f"No relational conversion for node type {node_type} available (yet)."
     57     )
     58 logger.debug(
     59     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     60 )
---> 61 df = plugin_instance.convert(rel, context=context)
     62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     63 return df

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/logical/aggregate.py:145, in DaskAggregatePlugin.convert(self, rel, context)
    144 def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
--> 145     (dc,) = self.assert_inputs(rel, 1, context)
    147     agg = rel.aggregate()
    149     df = dc.df

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/base.py:84, in BaseRelPlugin.assert_inputs(rel, n, context)
     81 # Late import to remove cycling dependency
     82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/base.py:84, in <listcomp>(.0)
     81 # Late import to remove cycling dependency
     82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
     55     raise NotImplementedError(
     56         f"No relational conversion for node type {node_type} available (yet)."
     57     )
     58 logger.debug(
     59     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     60 )
---> 61 df = plugin_instance.convert(rel, context=context)
     62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     63 return df

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/logical/aggregate.py:159, in DaskAggregatePlugin.convert(self, rel, context)
    153 cc = cc.make_unique()
    155 group_exprs = agg.getGroupSets()
    156 group_columns = (
    157     agg.getDistinctColumns()
    158     if agg.isDistinct()
--> 159     else [group_expr.column_name(rel) for group_expr in group_exprs]
    160 )
    162 dc = DataContainer(df, cc)
    164 if not group_columns:
    165     # There was actually no GROUP BY specified in the SQL
    166     # Still, this plan can also be used if we need to aggregate something over the full
    167     # data sample
    168     # To reuse the code, we just create a new column at the end with a single value

File ~/.conda/envs/dsql/lib/python3.9/site-packages/dask_sql/physical/rel/logical/aggregate.py:159, in <listcomp>(.0)
    153 cc = cc.make_unique()
    155 group_exprs = agg.getGroupSets()
    156 group_columns = (
    157     agg.getDistinctColumns()
    158     if agg.isDistinct()
--> 159     else [group_expr.column_name(rel) for group_expr in group_exprs]
    160 )
    162 dc = DataContainer(df, cc)
    164 if not group_columns:
    165     # There was actually no GROUP BY specified in the SQL
    166     # Still, this plan can also be used if we need to aggregate something over the full
    167     # data sample
    168     # To reuse the code, we just create a new column at the end with a single value

RuntimeError: SchemaError(FieldNotFound { qualifier: Some("df"), name: "id", valid_fields: Some(["df.val", "alias1"]) })
ayushdg commented 2 years ago

Not sure it's the same error but should revisit after the fix for #532 lands

randerzander commented 2 years ago

This has been resolved