dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
390 stars 72 forks source link

[ENH] Support INTERSECT operator #319

Open randerzander opened 2 years ago

randerzander commented 2 years ago

Sometimes intead of using a JOIN, an INTERSECT is used to find the overlap in two sets of records:

import pandas as pd

df_a = pd.DataFrame({'id': [0, 1, 2]})
df_b = pd.DataFrame({'id': [2]})

c.create_table('table_a', df_a)
c.create_table('table_b', df_b)
c.sql("select * from table_a intersect select * from table_b")

#expected:
2

Result:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     51         try:
---> 52             plugin_instance = cls.get_plugin(class_name)
     53         except KeyError:  # pragma: no cover

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/utils.py in get_plugin(cls, name)
     73         """Get a plugin with the given name"""
---> 74         return Pluggable.__plugins[cls][name]
     75 

KeyError: 'org.apache.calcite.rel.logical.LogicalIntersect'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
/tmp/ipykernel_628704/213746317.py in <module>
      6 c.create_table('table_a', df_a)
      7 c.create_table('table_b', df_b)
----> 8 c.sql("select * from table_a intersect select * from table_b")

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    436         rel, select_names, _ = self._get_ral(sql)
    437 
--> 438         dc = RelConverter.convert(rel, context=self)
    439 
    440         if dc is None:

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     58             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59         )
---> 60         df = plugin_instance.convert(rel, context=context)
     61         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62         return df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     27     ) -> DataContainer:
     28         # Get the input of the previous step
---> 29         (dc,) = self.assert_inputs(rel, 1, context)
     30 
     31         df = dc.df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     83         from dask_sql.physical.rel.convert import RelConverter
     84 
---> 85         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     86 
     87     @staticmethod

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     83         from dask_sql.physical.rel.convert import RelConverter
     84 
---> 85         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     86 
     87     @staticmethod

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     52             plugin_instance = cls.get_plugin(class_name)
     53         except KeyError:  # pragma: no cover
---> 54             raise NotImplementedError(
     55                 f"No conversion for class {class_name} available (yet)."
     56             )

NotImplementedError: No conversion for class org.apache.calcite.rel.logical.LogicalIntersect available (yet).

This could possibly be implemented by use of Dask DataFrame's merge operator.

jdye64 commented 2 years ago

@randerzander while not in the main branch this is supported in the datafusion-sql-planner branch which will eventually be merged into main. Does that meet your criteria enough that we could close this issue?