fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
1.92k stars 94 forks source link

[BUG] `AttributeError` in `PandasDataFrame.__init__` with `triad>=0.9.2` #526

Open charlesbluca opened 7 months ago

charlesbluca commented 7 months ago

Minimal Code To Reproduce

import fugue_sql

dag = fugue_sql.FugueSQLWorkflow()
df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str")
dag("SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result")

result = dag.run("dask")

Describe the bug When pulling in triad>=0.9.2, the above reproducer fails due to a missing enforce_type attribute:

AttributeError                            Traceback (most recent call last)
Cell In [1], line 7
      4 df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str")
      5 dag("SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result")
----> 7 result = dag.run("dask")

File /datasets/charlesb/miniforge3/envs/dask-sql-py38/lib/python3.8/site-packages/fugue/workflow/workflow.py:1523, in FugueWorkflow.run(self, *args, **kwargs)
   1521         if ctb is None:  # pragma: no cover
   1522             raise
-> 1523         raise ex.with_traceback(ctb)
   1524     self._computed = True
   1525 return DataFrames(
   1526     {
   1527         k: v.result
   (...)
   1530     }
   1531 )

Cell In [1], line 4
      1 import fugue_sql
      3 dag = fugue_sql.FugueSQLWorkflow()
----> 4 df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str")
      5 dag("SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result")
      7 result = dag.run("dask")

File /datasets/charlesb/miniforge3/envs/dask-sql-py38/lib/python3.8/site-packages/fugue/dataframe/pandas_dataframe.py:64, in PandasDataFrame.__init__(self, df, schema, metadata, pandas_df_wrapper)
     62 schema = _input_schema(schema).assert_not_empty()
     63 pdf = pd.DataFrame(df, columns=schema.names)
---> 64 pdf = PD_UTILS.enforce_type(pdf, schema.pa_schema, null_safe=True)
     65 if PD_UTILS.empty(pdf):
     66     for k, v in schema.items():

AttributeError: 'PandasUtils' object has no attribute 'enforce_type'

Expected behavior With triad=0.9.1, running the above workflow would succeed.

Environment (please complete the following information):

goodwanghan commented 7 months ago

Ah, sorry, but I think Fugue side has finished the change to depend on dask-sql when the backend is dask, I will make a pr to dask-sql to remove the unnecessary part.

paolorechia commented 7 months ago

Hey, I'm also affected by this bug, using Spark backend.

Some relevant requirements:

adtk==0.6.2
pyod==1.0.1
numpy==1.22.3
fugue==0.8.3

My application suddenly stopped working from one deployment to another. It seems like the setup.py of fugue pins down versions in a rather risky way:

        "triad>=0.9.3",
        "adagio>=0.2.4",
        # sql dependencies
        "qpd>=0.4.4",
        "fugue-sql-antlr>=0.1.6",

So I'm guessing triad was updated, which caused my pinned down version of fugue to break. I see two long term solutions to this problem:

  1. Change the pinning strategy of the fugue project, to something like this:

        "triad<=0.9.3",
        "adagio<=0.2.4",
        # sql dependencies
        "qpd<=0.4.4",
        "fugue-sql-antlr<=0.1.6",

    This would ensure sub-dependencies are only updated with a specific version update on fugue.

  2. Add a disclaimer in the installation steps that proper requirement locking is required from end-users, by using something like pipenv. Currently documentation recommends installation by pip which is a ticking bomb: https://github.com/fugue-project/fugue#installation