dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
386 stars 71 forks source link

[BUG] Can't infer file type of table when passing directory name only #430

Open randerzander opened 2 years ago

randerzander commented 2 years ago
from dask_sql import Context
import pandas as pd
import dask.dataframe as dd

c = Context()

pd.DataFrame({'id': [0, 1, 2]}).to_parquet('/data/test/part.0.parquet')

# this works
c.sql("""
CREATE OR REPLACE TABLE test WITH (
    location = '/data/test/*.parquet'
)
""")

# this fails
c.sql("""
CREATE OR REPLACE TABLE test WITH (
    location = '/data/test/'
)
""")

Trace:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/input_utils/location.py:53, in LocationInputPlugin.to_dc(self, input_item, table_name, format, gpu, **kwargs)
     52     else:
---> 53         read_function = getattr(dd, f"read_{format}")
     54 except AttributeError:

AttributeError: module 'dask.dataframe' has no attribute 'read_'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
Input In [77], in <cell line: 9>()
      5 c = Context()
      7 pd.DataFrame({'id': [0, 1, 2]}).to_parquet('/data/test/part.0.parquet')
----> 9 c.sql("""
     10 CREATE OR REPLACE TABLE test WITH (
     11     location = '/data/test/'
     12 )
     13 """)

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/context.py:467, in Context.sql(self, sql, return_futures, dataframes, gpu, config_options)
    463         self.create_table(df_name, df, gpu=gpu)
    465 rel, select_names, _ = self._get_ral(sql)
--> 467 dc = RelConverter.convert(rel, context=self)
    469 if dc is None:
    470     return

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:60, in RelConverter.convert(cls, rel, context)
     54     raise NotImplementedError(
     55         f"No conversion for class {class_name} available (yet)."
     56     )
     57 logger.debug(
     58     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59 )
---> 60 df = plugin_instance.convert(rel, context=context)
     61 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62 return df

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/physical/rel/custom/create_table.py:71, in CreateTablePlugin.convert(self, sql, context)
     68     raise AttributeError("Parameters must include a 'location' parameter.")
     70 gpu = kwargs.pop("gpu", False)
---> 71 context.create_table(
     72     table_name,
     73     location,
     74     format=format,
     75     persist=persist,
     76     schema_name=schema_name,
     77     gpu=gpu,
     78     **kwargs,
     79 )

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/context.py:210, in Context.create_table(self, table_name, input_table, format, persist, schema_name, statistics, gpu, **kwargs)
    206     format = kwargs.pop("file_format")
    208 schema_name = schema_name or self.schema_name
--> 210 dc = InputUtil.to_dc(
    211     input_table,
    212     table_name=table_name,
    213     format=format,
    214     persist=persist,
    215     gpu=gpu,
    216     **kwargs,
    217 )
    218 self.schema[schema_name].tables[table_name.lower()] = dc
    219 if statistics:

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/input_utils/convert.py:64, in InputUtil.to_dc(cls, input_item, table_name, format, persist, gpu, **kwargs)
     62     table = dd.concat([filled_get_dask_dataframe(item) for item in input_item])
     63 else:
---> 64     table = filled_get_dask_dataframe(input_item)
     66 if persist:
     67     table = table.persist()

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/input_utils/convert.py:57, in InputUtil.to_dc.<locals>.<lambda>(*args)
     42 @classmethod
     43 def to_dc(
     44     cls,
   (...)
     50     **kwargs,
     51 ) -> DataContainer:
     52     """
     53     Turn possible input descriptions or formats (e.g. dask dataframes, pandas dataframes,
     54     locations as string, hive tables) into the loaded data containers,
     55     maybe persist them to cluster memory before.
     56     """
---> 57     filled_get_dask_dataframe = lambda *args: cls._get_dask_dataframe(
     58         *args, table_name=table_name, format=format, gpu=gpu, **kwargs,
     59     )
     61     if isinstance(input_item, list):
     62         table = dd.concat([filled_get_dask_dataframe(item) for item in input_item])

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/input_utils/convert.py:86, in InputUtil._get_dask_dataframe(cls, input_item, table_name, format, gpu, **kwargs)
     82 for plugin in plugin_list:
     83     if plugin.is_correct_input(
     84         input_item, table_name=table_name, format=format, **kwargs
     85     ):
---> 86         return plugin.to_dc(
     87             input_item, table_name=table_name, format=format, gpu=gpu, **kwargs
     88         )
     90 raise ValueError(f"Do not understand the input type {type(input_item)}")

File /conda/envs/dsql-3-09/lib/python3.9/site-packages/dask_sql/input_utils/location.py:55, in LocationInputPlugin.to_dc(self, input_item, table_name, format, gpu, **kwargs)
     53         read_function = getattr(dd, f"read_{format}")
     54 except AttributeError:
---> 55     raise AttributeError(f"Can not read files of format {format}")
     57 return read_function(input_item, **kwargs)

AttributeError: Can not read files of format 
ayushdg commented 2 years ago

The logic today for inferring file formats if not explicitly provided is just checking the file extension using the following: https://github.com/dask-contrib/dask-sql/blob/c3ad6a9f6b01ce02127fde7501eaf322c8160f7e/dask_sql/input_utils/location.py#L41-L44

I believe some work can be done to improve the error message when an extension was not provided in the file path and encourage users to explicitly provide this value.

We can also explore adding more checks around if the input is a directory and if so trying to infer the format from one of the files within that directory, though I did want to mention that from initial search, other frameworks usually also expect users to provide the file format during dataset creation.

randerzander commented 2 years ago

Since we're talking about improving this feature, I might be missing where the logic to handle it would actually be applied, but it doesn't look like these checks include handling Dask's _metadata or Spark's _SUCCESS file if they're in the directory?

We can also explore adding more checks around if the input is a directory and if so trying to infer the format from one of the files within that directory

This sounds like a nicer user experience than asking people to type .../*.parquet which is about as much typing as including the `format = 'parquet' arg.